commit for merge
This commit is contained in:
@@ -1,14 +1,11 @@
|
||||
import mutagen.id3
|
||||
import requests
|
||||
import os.path
|
||||
import pandas as pd
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from pydub import AudioSegment
|
||||
import json
|
||||
import logging
|
||||
|
||||
import musify
|
||||
import youtube_music
|
||||
from scraping import musify, youtube_music
|
||||
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/ID3
|
||||
@@ -16,40 +13,13 @@ https://mutagen.readthedocs.io/en/latest/user/id3.html
|
||||
|
||||
# to get all valid keys
|
||||
from mutagen.easyid3 import EasyID3
|
||||
print("\n".join(EasyID3.valid_keys.keys()))
|
||||
print(EasyID3.valid_keys.keys())
|
||||
"""
|
||||
|
||||
|
||||
def write_metadata(row, file_path):
|
||||
# only convert the file to the proper format if mutagen doesn't work with it due to time
|
||||
try:
|
||||
audiofile = EasyID3(file_path)
|
||||
except mutagen.id3.ID3NoHeaderError:
|
||||
AudioSegment.from_file(file_path).export(file_path, format="mp3")
|
||||
audiofile = EasyID3(file_path)
|
||||
|
||||
valid_keys = list(EasyID3.valid_keys.keys())
|
||||
|
||||
for key in list(row.keys()):
|
||||
if type(row[key]) == list or key in valid_keys and not pd.isna(row[key]):
|
||||
if type(row[key]) == int or type(row[key]) == float:
|
||||
row[key] = str(row[key])
|
||||
audiofile[key] = row[key]
|
||||
|
||||
logging.info("saving")
|
||||
audiofile.save(file_path, v1=2)
|
||||
|
||||
|
||||
def path_stuff(path: str, file_: str):
|
||||
# returns true if it shouldn't be downloaded
|
||||
if os.path.exists(file_):
|
||||
logging.info(f"'{file_}' does already exist, thus not downloading.")
|
||||
return True
|
||||
os.makedirs(path, exist_ok=True)
|
||||
return False
|
||||
|
||||
|
||||
class Download:
|
||||
<<<<<<< HEAD
|
||||
def __init__(self, session: requests.Session = requests.Session(), file: str = ".cache3.csv", temp: str = "temp",
|
||||
base_path: str = ""):
|
||||
self.session = session
|
||||
@@ -59,16 +29,22 @@ class Download:
|
||||
}
|
||||
self.temp = temp
|
||||
self.file = file
|
||||
=======
|
||||
def __init__(self, database, logger: logging.Logger, proxies: dict = None, base_path: str = ""):
|
||||
if proxies is not None:
|
||||
musify.set_proxy(proxies)
|
||||
|
||||
self.dataframe = pd.read_csv(os.path.join(self.temp, self.file), index_col=0)
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
for idx, row in self.dataframe.iterrows():
|
||||
row['artist'] = json.loads(row['artist'].replace("'", '"'))
|
||||
for row in database.get_tracks_to_download():
|
||||
row['artist'] = [i['name'] for i in row['artists']]
|
||||
row['file'] = os.path.join(base_path, row['file'])
|
||||
row['path'] = os.path.join(base_path, row['path'])
|
||||
|
||||
if path_stuff(row['path'], row['file']):
|
||||
write_metadata(row, row['file'])
|
||||
if self.path_stuff(row['path'], row['file']):
|
||||
self.write_metadata(row, row['file'])
|
||||
continue
|
||||
|
||||
download_success = None
|
||||
@@ -79,10 +55,41 @@ class Download:
|
||||
download_success = youtube_music.download(row)
|
||||
|
||||
if download_success == -1:
|
||||
logging.warning(f"couldn't download {row.url} from {row.src}")
|
||||
self.logger.warning(f"couldn't download {row['url']} from {row['src']}")
|
||||
continue
|
||||
|
||||
write_metadata(row, row['file'])
|
||||
self.write_metadata(row, row['file'])
|
||||
|
||||
def write_metadata(self, row, file_path):
|
||||
if not os.path.exists(file_path):
|
||||
self.logger.warning("something went really wrong")
|
||||
return False
|
||||
|
||||
# only convert the file to the proper format if mutagen doesn't work with it due to time
|
||||
try:
|
||||
audiofile = EasyID3(file_path)
|
||||
except mutagen.id3.ID3NoHeaderError:
|
||||
AudioSegment.from_file(file_path).export(file_path, format="mp3")
|
||||
audiofile = EasyID3(file_path)
|
||||
|
||||
valid_keys = list(EasyID3.valid_keys.keys())
|
||||
|
||||
for key in list(row.keys()):
|
||||
if key in valid_keys and row[key] is not None:
|
||||
if type(row[key]) != list:
|
||||
row[key] = str(row[key])
|
||||
audiofile[key] = row[key]
|
||||
|
||||
self.logger.info("saving")
|
||||
audiofile.save(file_path, v1=2)
|
||||
|
||||
def path_stuff(self, path: str, file_: str):
|
||||
# returns true if it shouldn't be downloaded
|
||||
if os.path.exists(file_):
|
||||
self.logger.info(f"'{file_}' does already exist, thus not downloading.")
|
||||
return True
|
||||
os.makedirs(path, exist_ok=True)
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@@ -1,55 +1,57 @@
|
||||
import json
|
||||
import os.path
|
||||
import pandas as pd
|
||||
import requests
|
||||
import os
|
||||
import logging
|
||||
|
||||
import musify
|
||||
import youtube_music
|
||||
from scraping import musify, youtube_music
|
||||
|
||||
|
||||
class Download:
|
||||
<<<<<<< HEAD
|
||||
def __init__(self, metadata_csv: str = ".cache1.csv", session: requests.Session = requests.Session(),
|
||||
file: str = ".cache2.csv", temp: str = "temp") -> None:
|
||||
self.temp = temp
|
||||
self.metadata = pd.read_csv(os.path.join(self.temp, metadata_csv), index_col=0)
|
||||
=======
|
||||
def __init__(self, database, logger: logging.Logger, music_dir: str, proxies: dict = None) -> None:
|
||||
self.music_dir = music_dir
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
if proxies is not None:
|
||||
musify.set_proxy(proxies)
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
self.urls = []
|
||||
|
||||
for idx, row in self.metadata.iterrows():
|
||||
row['artist'] = json.loads(row['artist'].replace("'", '"'))
|
||||
for row in self.database.get_tracks_without_src():
|
||||
row['artists'] = [artist['name'] for artist in row['artists']]
|
||||
|
||||
# check musify
|
||||
musify_url = musify.get_musify_url(row)
|
||||
if musify_url is not None:
|
||||
self.add_url(musify_url, 'musify', dict(row))
|
||||
id_ = row['id']
|
||||
if os.path.exists(os.path.join(self.music_dir, row['file'])):
|
||||
self.logger.info(f"skipping the fetching of the download links, cuz {row['file']} already exists.")
|
||||
continue
|
||||
|
||||
# check YouTube
|
||||
youtube_url = youtube_music.get_youtube_url(row)
|
||||
if youtube_url is not None:
|
||||
self.add_url(youtube_url, 'youtube', dict(row))
|
||||
self.add_url(youtube_url, 'youtube', id_)
|
||||
continue
|
||||
|
||||
# check musify
|
||||
musify_url = musify.get_musify_url(row)
|
||||
if musify_url is not None:
|
||||
self.add_url(musify_url, 'musify', id_)
|
||||
continue
|
||||
|
||||
# check musify again, but with a different methode that takes longer
|
||||
musify_url = musify.get_musify_url_slow(row)
|
||||
if musify_url is not None:
|
||||
self.add_url(musify_url, 'musify', dict(row))
|
||||
self.add_url(musify_url, 'musify', id_)
|
||||
continue
|
||||
|
||||
logging.warning(f"Didn't find any sources for {row['title']}")
|
||||
self.logger.warning(f"Didn't find any sources for {row['title']}")
|
||||
|
||||
self.dump_urls(file)
|
||||
|
||||
def add_url(self, url: str, src: str, row: dict):
|
||||
row['url'] = url
|
||||
row['src'] = src
|
||||
|
||||
self.urls.append(row)
|
||||
|
||||
def dump_urls(self, file: str = ".cache2.csv"):
|
||||
df = pd.DataFrame(self.urls)
|
||||
df.to_csv(os.path.join(self.temp, file))
|
||||
def add_url(self, url: str, src: str, id_: str):
|
||||
self.database.set_download_data(id_, url, src)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -60,4 +62,4 @@ if __name__ == "__main__":
|
||||
|
||||
s = requests.Session()
|
||||
s.proxies = proxies
|
||||
download = Download(session=s)
|
||||
download = Download()
|
||||
|
83
src/main.py
83
src/main.py
@@ -1,4 +1,11 @@
|
||||
<<<<<<< HEAD
|
||||
import metadata
|
||||
=======
|
||||
from metadata.database import Database
|
||||
from metadata.download import MetadataDownloader
|
||||
import metadata.download
|
||||
import metadata.search
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
import download_links
|
||||
import url_to_path
|
||||
import download
|
||||
@@ -6,18 +13,40 @@ import download
|
||||
import logging
|
||||
import requests
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
TEMP = "temp"
|
||||
STEP_ONE_CACHE = ".cache1.csv"
|
||||
STEP_TWO_CACHE = ".cache2.csv"
|
||||
STEP_THREE_CACHE = ".cache3.csv"
|
||||
TEMP_FOLDER = "music-downloader"
|
||||
DATABASE_FILE = "metadata.db"
|
||||
DATABASE_STRUCTURE_FILE = "database_structure.sql"
|
||||
DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/new_metadata/assets/database_structure.sql"
|
||||
|
||||
SEARCH_LOGGER = logging.getLogger("mb-cli")
|
||||
DATABASE_LOGGER = logging.getLogger("database")
|
||||
METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata-download")
|
||||
URL_DOWNLOAD_LOGGER = logging.getLogger("ling-download")
|
||||
PATH_LOGGER = logging.getLogger("create-paths")
|
||||
DOWNLOAD_LOGGER = logging.getLogger("download")
|
||||
|
||||
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
|
||||
MUSIC_DIR = os.path.expanduser('~/Music')
|
||||
TOR = False
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
|
||||
temp_dir = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
|
||||
if not os.path.exists(temp_dir):
|
||||
os.mkdir(temp_dir)
|
||||
|
||||
database = Database(os.path.join(temp_dir, DATABASE_FILE),
|
||||
os.path.join(temp_dir, DATABASE_STRUCTURE_FILE),
|
||||
DATABASE_STRUCTURE_FALLBACK,
|
||||
DATABASE_LOGGER,
|
||||
reset_anyways=True)
|
||||
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
def get_existing_genre():
|
||||
@@ -29,28 +58,34 @@ def get_existing_genre():
|
||||
return valid_directories
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
def search_for_metadata(query: str):
|
||||
search = metadata.Search(query=query, temp=TEMP)
|
||||
=======
|
||||
def search_for_metadata():
|
||||
search = metadata.search.Search(logger=SEARCH_LOGGER)
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
print(search.options)
|
||||
while True:
|
||||
input_ = input(
|
||||
"q to quit, ok to download, .. for previous options, . for current options, int for this element: ").lower()
|
||||
"q to quit, .. for previous options, int for this element, str to search for query, ok to download\n")
|
||||
input_.strip()
|
||||
if input_ == "q":
|
||||
exit(0)
|
||||
if input_ == "ok":
|
||||
return search
|
||||
if input_ == ".":
|
||||
print(search.options)
|
||||
continue
|
||||
if input_ == "..":
|
||||
if input_.lower() == "ok":
|
||||
break
|
||||
if input_.lower() == "q":
|
||||
break
|
||||
if input_.lower() == "..":
|
||||
print()
|
||||
print(search.get_previous_options())
|
||||
continue
|
||||
if input_.isdigit():
|
||||
print()
|
||||
print(search.choose(int(input_)))
|
||||
continue
|
||||
print()
|
||||
print(search.search_from_query(input_))
|
||||
|
||||
return search.current_option
|
||||
|
||||
def get_genre():
|
||||
existing_genres = get_existing_genre()
|
||||
@@ -83,21 +118,31 @@ def cli(start_at: int = 0):
|
||||
logging.info(f"{genre} has been set as genre.")
|
||||
|
||||
if start_at <= 0:
|
||||
search = search_for_metadata(query=input("initial query: "))
|
||||
search = search_for_metadata()
|
||||
logging.info("Starting Downloading of metadata")
|
||||
search.download(file=STEP_ONE_CACHE)
|
||||
metadata_downloader = MetadataDownloader(database, METADATA_DOWNLOAD_LOGGER)
|
||||
metadata_downloader.download(search)
|
||||
|
||||
if start_at <= 1:
|
||||
<<<<<<< HEAD
|
||||
logging.info("Fetching Download Links")
|
||||
download_links.Download(file=STEP_TWO_CACHE, metadata_csv=STEP_ONE_CACHE, temp=TEMP, session=session)
|
||||
=======
|
||||
logging.info("creating Paths")
|
||||
url_to_path.UrlPath(database, PATH_LOGGER, genre=genre)
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
if start_at <= 2:
|
||||
logging.info("creating Paths")
|
||||
url_to_path.UrlPath(genre=genre)
|
||||
logging.info("Fetching Download Links")
|
||||
download_links.Download(database, METADATA_DOWNLOAD_LOGGER, MUSIC_DIR, proxies=proxies)
|
||||
|
||||
if start_at <= 3:
|
||||
logging.info("starting to download the mp3's")
|
||||
<<<<<<< HEAD
|
||||
download.Download(session=session, file=STEP_THREE_CACHE, temp=TEMP, base_path=MUSIC_DIR)
|
||||
=======
|
||||
download.Download(database, DOWNLOAD_LOGGER, proxies=proxies, base_path=MUSIC_DIR)
|
||||
>>>>>>> 63f30bffbae20ec3fc368a6093b28e56f0230318
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
251
src/metadata/database.py
Normal file
251
src/metadata/database.py
Normal file
@@ -0,0 +1,251 @@
|
||||
import sqlite3
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
import requests
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self, path_to_db: str, db_structure: str, db_structure_fallback: str, logger: logging.Logger, reset_anyways: bool = False):
|
||||
self.logger = logger
|
||||
self.path_to_db = path_to_db
|
||||
|
||||
self.connection = sqlite3.connect(self.path_to_db)
|
||||
self.cursor = self.connection.cursor()
|
||||
|
||||
# init database
|
||||
self.init_db(database_structure=db_structure, database_structure_fallback=db_structure_fallback, reset_anyways=reset_anyways)
|
||||
|
||||
def init_db(self, database_structure: str, database_structure_fallback: str, reset_anyways: bool = False):
|
||||
# check if db exists
|
||||
exists = True
|
||||
try:
|
||||
query = 'SELECT * FROM track;'
|
||||
self.cursor.execute(query)
|
||||
_ = self.cursor.fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
exists = False
|
||||
|
||||
if not exists:
|
||||
self.logger.info("Database does not exist yet.")
|
||||
|
||||
if reset_anyways or not exists:
|
||||
# reset the database if reset_anyways is true or if an error has been thrown previously.
|
||||
self.logger.info("Creating/Reseting Database.")
|
||||
|
||||
if not os.path.exists(database_structure):
|
||||
self.logger.info("database structure file doesn't exist yet, fetching from github")
|
||||
r = requests.get(database_structure_fallback)
|
||||
|
||||
with open(database_structure, "w") as f:
|
||||
f.write(r.text)
|
||||
|
||||
# read the file
|
||||
with open(database_structure, "r") as database_structure_file:
|
||||
query = database_structure_file.read()
|
||||
self.cursor.executescript(query)
|
||||
self.connection.commit()
|
||||
|
||||
def add_artist(
|
||||
self,
|
||||
musicbrainz_artistid: str,
|
||||
artist: str = None
|
||||
):
|
||||
query = "INSERT OR REPLACE INTO artist (id, name) VALUES (?, ?);"
|
||||
values = musicbrainz_artistid, artist
|
||||
|
||||
self.cursor.execute(query, values)
|
||||
self.connection.commit()
|
||||
|
||||
def add_release_group(
|
||||
self,
|
||||
musicbrainz_releasegroupid: str,
|
||||
artist_ids: list,
|
||||
albumartist: str = None,
|
||||
albumsort: int = None,
|
||||
musicbrainz_albumtype: str = None,
|
||||
compilation: str = None,
|
||||
album_artist_id: str = None
|
||||
):
|
||||
# add adjacency
|
||||
adjacency_list = []
|
||||
for artist_id in artist_ids:
|
||||
adjacency_list.append((artist_id, musicbrainz_releasegroupid))
|
||||
adjacency_values = tuple(adjacency_list)
|
||||
adjacency_query = "INSERT OR REPLACE INTO artist_release_group (artist_id, release_group_id) VALUES (?, ?);"
|
||||
self.cursor.executemany(adjacency_query, adjacency_values)
|
||||
self.connection.commit()
|
||||
|
||||
# add release group
|
||||
query = "INSERT OR REPLACE INTO release_group (id, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id) VALUES (?, ?, ?, ?, ?, ?);"
|
||||
values = musicbrainz_releasegroupid, albumartist, albumsort, musicbrainz_albumtype, compilation, album_artist_id
|
||||
self.cursor.execute(query, values)
|
||||
self.connection.commit()
|
||||
|
||||
def add_release(
|
||||
self,
|
||||
musicbrainz_albumid: str,
|
||||
release_group_id: str,
|
||||
title: str = None,
|
||||
copyright_: str = None,
|
||||
album_status: str = None,
|
||||
language: str = None,
|
||||
year: str = None,
|
||||
date: str = None,
|
||||
country: str = None,
|
||||
barcode: str = None
|
||||
):
|
||||
query = "INSERT OR REPLACE INTO release_ (id, release_group_id, title, copyright, album_status, language, year, date, country, barcode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"
|
||||
values = musicbrainz_albumid, release_group_id, title, copyright_, album_status, language, year, date, country, barcode
|
||||
|
||||
self.cursor.execute(query, values)
|
||||
self.connection.commit()
|
||||
|
||||
def add_track(
|
||||
self,
|
||||
musicbrainz_releasetrackid: str,
|
||||
musicbrainz_albumid: str,
|
||||
feature_aritsts: list,
|
||||
tracknumber: str = None,
|
||||
track: str = None,
|
||||
isrc: str = None
|
||||
):
|
||||
# add adjacency
|
||||
adjacency_list = []
|
||||
for artist_id in feature_aritsts:
|
||||
adjacency_list.append((artist_id, musicbrainz_releasetrackid))
|
||||
adjacency_values = tuple(adjacency_list)
|
||||
adjacency_query = "INSERT OR REPLACE INTO artist_track (artist_id, track_id) VALUES (?, ?);"
|
||||
self.cursor.executemany(adjacency_query, adjacency_values)
|
||||
self.connection.commit()
|
||||
|
||||
# add track
|
||||
query = "INSERT OR REPLACE INTO track (id, release_id, track, isrc, tracknumber) VALUES (?, ?, ?, ?, ?);"
|
||||
values = musicbrainz_releasetrackid, musicbrainz_albumid, track, isrc, tracknumber
|
||||
self.cursor.execute(query, values)
|
||||
self.connection.commit()
|
||||
|
||||
@staticmethod
|
||||
def get_custom_track_query(custom_where: list) -> str:
|
||||
where_args = [
|
||||
"track.release_id == release_.id",
|
||||
"release_group.id == release_.release_group_id",
|
||||
"artist_track.artist_id == artist.id",
|
||||
"artist_track.track_id == track.id"
|
||||
]
|
||||
where_args.extend(custom_where)
|
||||
|
||||
where_arg = " AND ".join(where_args)
|
||||
query = f"""
|
||||
SELECT DISTINCT
|
||||
json_object(
|
||||
'artists', json_group_array(
|
||||
(
|
||||
SELECT DISTINCT json_object(
|
||||
'id', artist.id,
|
||||
'name', artist.name
|
||||
)
|
||||
)
|
||||
),
|
||||
'id', track.id,
|
||||
'tracknumber', track.tracknumber,
|
||||
'titlesort ', track.tracknumber,
|
||||
'musicbrainz_releasetrackid', track.id,
|
||||
'musicbrainz_albumid', release_.id,
|
||||
'title', track.track,
|
||||
'isrc', track.isrc,
|
||||
'album', release_.title,
|
||||
'copyright', release_.copyright,
|
||||
'album_status', release_.album_status,
|
||||
'language', release_.language,
|
||||
'year', release_.year,
|
||||
'date', release_.date,
|
||||
'country', release_.country,
|
||||
'barcode', release_.barcode,
|
||||
'albumartist', release_group.albumartist,
|
||||
'albumsort', release_group.albumsort,
|
||||
'musicbrainz_albumtype', release_group.musicbrainz_albumtype,
|
||||
'compilation', release_group.compilation,
|
||||
'album_artist_id', release_group.album_artist_id,
|
||||
'path', track.path,
|
||||
'file', track.file,
|
||||
'genre', track.genre,
|
||||
'url', track.url,
|
||||
'src', track.src
|
||||
)
|
||||
FROM track, release_, release_group,artist, artist_track
|
||||
WHERE
|
||||
{where_arg}
|
||||
GROUP BY track.id;
|
||||
"""
|
||||
return query
|
||||
|
||||
def get_custom_track(self, custom_where: list):
|
||||
query = Database.get_custom_track_query(custom_where=custom_where)
|
||||
return [json.loads(i[0]) for i in self.cursor.execute(query)]
|
||||
|
||||
def get_track_metadata(self, musicbrainz_releasetrackid: str):
|
||||
# this would be vulnerable if musicbrainz_releasetrackid would be user input
|
||||
resulting_tracks = self.get_custom_track([f'track.id == "{musicbrainz_releasetrackid}"'])
|
||||
if len(resulting_tracks) != 1:
|
||||
return -1
|
||||
|
||||
return resulting_tracks[0]
|
||||
|
||||
def get_tracks_to_download(self):
|
||||
return self.get_custom_track(['track.downloaded == 0'])
|
||||
|
||||
def get_tracks_without_src(self):
|
||||
return self.get_custom_track(["(track.url IS NULL OR track.src IS NULL)"])
|
||||
|
||||
def get_tracks_without_isrc(self):
|
||||
return self.get_custom_track(["track.isrc IS NULL"])
|
||||
|
||||
def get_tracks_without_filepath(self):
|
||||
return self.get_custom_track(["(track.file IS NULL OR track.path IS NULL OR track.genre IS NULL)"])
|
||||
|
||||
def update_download_status(self, track_id: str):
|
||||
pass
|
||||
|
||||
def set_download_data(self, track_id: str, url: str, src: str):
|
||||
query = f"""
|
||||
UPDATE track
|
||||
SET url = ?,
|
||||
src = ?
|
||||
WHERE '{track_id}' == id;
|
||||
"""
|
||||
self.cursor.execute(query, (url, src))
|
||||
self.connection.commit()
|
||||
|
||||
def set_filepath(self, track_id: str, file: str, path: str, genre: str):
|
||||
query = f"""
|
||||
UPDATE track
|
||||
SET file = ?,
|
||||
path = ?,
|
||||
genre = ?
|
||||
WHERE '{track_id}' == id;
|
||||
"""
|
||||
self.cursor.execute(query, (file, path, genre))
|
||||
self.connection.commit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import tempfile
|
||||
|
||||
temp_folder = "music-downloader"
|
||||
temp_dir = os.path.join(tempfile.gettempdir(), temp_folder)
|
||||
if not os.path.exists(temp_dir):
|
||||
os.mkdir(temp_dir)
|
||||
|
||||
temp_dir = get_temp_dir()
|
||||
DATABASE_FILE = "metadata.db"
|
||||
DATABASE_STRUCTURE_FILE = "database_structure.sql"
|
||||
db_path = os.path.join(TEMP_DIR, DATABASE_FILE)
|
||||
|
||||
logging.basicConfig()
|
||||
|
||||
logger = logging.getLogger("database")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
database = Database(os.path.join(temp_dir, "metadata.db"), os.path.join(temp_dir, "database_structure.sql"), logger,
|
||||
reset_anyways=True)
|
56
src/metadata/database_structure.sql
Normal file
56
src/metadata/database_structure.sql
Normal file
@@ -0,0 +1,56 @@
|
||||
DROP TABLE IF EXISTS artist;
|
||||
CREATE TABLE artist (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
name TEXT
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS artist_release_group;
|
||||
CREATE TABLE artist_release_group (
|
||||
artist_id TEXT NOT NULL,
|
||||
release_group_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS artist_track;
|
||||
CREATE TABLE artist_track (
|
||||
artist_id TEXT NOT NULL,
|
||||
track_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS release_group;
|
||||
CREATE TABLE release_group (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
albumartist TEXT,
|
||||
albumsort INT,
|
||||
musicbrainz_albumtype TEXT,
|
||||
compilation TEXT,
|
||||
album_artist_id TEXT
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS release_;
|
||||
CREATE TABLE release_ (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
release_group_id TEXT NOT NULL,
|
||||
title TEXT,
|
||||
copyright TEXT,
|
||||
album_status TEXT,
|
||||
language TEXT,
|
||||
year TEXT,
|
||||
date TEXT,
|
||||
country TEXT,
|
||||
barcode TEXT
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS track;
|
||||
CREATE TABLE track (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
downloaded BOOLEAN NOT NULL DEFAULT 0,
|
||||
release_id TEXT NOT NULL,
|
||||
track TEXT,
|
||||
tracknumber TEXT,
|
||||
isrc TEXT,
|
||||
genre TEXT,
|
||||
path TEXT,
|
||||
file TEXT,
|
||||
url TEXT,
|
||||
src TEXT
|
||||
);
|
360
src/metadata/download.py
Normal file
360
src/metadata/download.py
Normal file
@@ -0,0 +1,360 @@
|
||||
from typing import List
|
||||
import musicbrainzngs
|
||||
import logging
|
||||
|
||||
try:
|
||||
from object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
except ModuleNotFoundError:
|
||||
from metadata.object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
# I don't know if it would be feesable to set up my own mb instance
|
||||
# https://github.com/metabrainz/musicbrainz-docker
|
||||
|
||||
mb_log = logging.getLogger("musicbrainzngs")
|
||||
mb_log.setLevel(logging.WARNING)
|
||||
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
|
||||
|
||||
|
||||
# IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES
|
||||
# https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data
|
||||
|
||||
|
||||
class MetadataDownloader:
|
||||
def __init__(self, database, logger: logging.Logger):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
|
||||
class Artist:
|
||||
def __init__(
|
||||
self,
|
||||
database,
|
||||
logger,
|
||||
musicbrainz_artistid: str,
|
||||
release_groups: List = [],
|
||||
new_release_groups: bool = True
|
||||
):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
"""
|
||||
release_groups: list
|
||||
"""
|
||||
self.release_groups = release_groups
|
||||
|
||||
self.musicbrainz_artistid = musicbrainz_artistid
|
||||
|
||||
result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"])
|
||||
artist_data = get_elem_from_obj(result, ['artist'], return_if_none={})
|
||||
|
||||
self.artist = get_elem_from_obj(artist_data, ['name'])
|
||||
|
||||
self.save()
|
||||
|
||||
# STARTING TO FETCH' RELEASE GROUPS. IMPORTANT: DON'T WRITE ANYTHING BESIDES THAT HERE
|
||||
if not new_release_groups:
|
||||
return
|
||||
# sort all release groups by date and add album sort to have them in chronological order.
|
||||
release_groups = artist_data['release-group-list']
|
||||
for i, release_group in enumerate(release_groups):
|
||||
release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
|
||||
release_groups.sort(key=lambda x: x['first-release-date'])
|
||||
|
||||
for i, release_group in enumerate(release_groups):
|
||||
self.release_groups.append(MetadataDownloader.ReleaseGroup(
|
||||
self.database,
|
||||
self.logger,
|
||||
musicbrainz_releasegroupid=release_group['id'],
|
||||
artists=[self],
|
||||
albumsort=i + 1
|
||||
))
|
||||
|
||||
def __str__(self):
|
||||
newline = "\n"
|
||||
return f"id: {self.musicbrainz_artistid}\nname: {self.artist}\n{newline.join([str(release_group) for release_group in self.release_groups])}"
|
||||
|
||||
def save(self):
|
||||
self.logger.info(f"artist: {self}")
|
||||
self.database.add_artist(
|
||||
musicbrainz_artistid=self.musicbrainz_artistid,
|
||||
artist=self.artist
|
||||
)
|
||||
|
||||
class ReleaseGroup:
|
||||
def __init__(
|
||||
self,
|
||||
database,
|
||||
logger,
|
||||
musicbrainz_releasegroupid: str,
|
||||
artists=[],
|
||||
albumsort: int = None,
|
||||
only_download_distinct_releases: bool = True,
|
||||
fetch_further: bool = True
|
||||
):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
"""
|
||||
split_artists: list -> if len > 1: album_artist=VariousArtists
|
||||
releases: list
|
||||
"""
|
||||
|
||||
self.musicbrainz_releasegroupid = musicbrainz_releasegroupid
|
||||
self.artists = artists
|
||||
self.releases = []
|
||||
|
||||
result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid,
|
||||
includes=["artist-credits", "releases"])
|
||||
release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={})
|
||||
release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={})
|
||||
|
||||
for artist_data in artist_datas:
|
||||
artist_id = get_elem_from_obj(artist_data, ['artist', 'id'])
|
||||
if artist_id is None:
|
||||
continue
|
||||
self.append_artist(artist_id)
|
||||
self.albumartist = "Various Artists" if len(self.artists) > 1 else self.artists[0].artist
|
||||
self.album_artist_id = None if self.albumartist == "Various Artists" else self.artists[
|
||||
0].musicbrainz_artistid
|
||||
|
||||
self.albumsort = albumsort
|
||||
self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type'])
|
||||
self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None
|
||||
|
||||
self.save()
|
||||
|
||||
if not fetch_further:
|
||||
return
|
||||
|
||||
if only_download_distinct_releases:
|
||||
self.append_distinct_releases(release_datas)
|
||||
else:
|
||||
self.append_all_releases(release_datas)
|
||||
|
||||
def __str__(self):
|
||||
newline = "\n"
|
||||
return f"{newline.join([str(release_group) for release_group in self.releases])}"
|
||||
|
||||
def save(self):
|
||||
self.logger.info(f"caching release_group {self}")
|
||||
self.database.add_release_group(
|
||||
musicbrainz_releasegroupid=self.musicbrainz_releasegroupid,
|
||||
artist_ids=[artist.musicbrainz_artistid for artist in self.artists],
|
||||
albumartist=self.albumartist,
|
||||
albumsort=self.albumsort,
|
||||
musicbrainz_albumtype=self.musicbrainz_albumtype,
|
||||
compilation=self.compilation,
|
||||
album_artist_id=self.album_artist_id
|
||||
)
|
||||
|
||||
def append_artist(self, artist_id: str):
|
||||
for existing_artist in self.artists:
|
||||
if artist_id == existing_artist.musicbrainz_artistid:
|
||||
return existing_artist
|
||||
new_artist = MetadataDownloader.Artist(self.database, self.logger, artist_id, release_groups=[self],
|
||||
new_release_groups=False)
|
||||
self.artists.append(new_artist)
|
||||
return new_artist
|
||||
|
||||
def append_release(self, release_data: dict):
|
||||
musicbrainz_albumid = get_elem_from_obj(release_data, ['id'])
|
||||
if musicbrainz_albumid is None:
|
||||
return
|
||||
self.releases.append(
|
||||
MetadataDownloader.Release(self.database, self.logger, musicbrainz_albumid, release_group=self))
|
||||
|
||||
def append_distinct_releases(self, release_datas: List[dict]):
|
||||
titles = {}
|
||||
|
||||
for release_data in release_datas:
|
||||
title = get_elem_from_obj(release_data, ['title'])
|
||||
if title is None:
|
||||
continue
|
||||
titles[title] = release_data
|
||||
|
||||
for key in titles:
|
||||
self.append_release(titles[key])
|
||||
|
||||
def append_all_releases(self, release_datas: List[dict]):
|
||||
for release_data in release_datas:
|
||||
self.append_release(release_data)
|
||||
|
||||
class Release:
|
||||
def __init__(
|
||||
self,
|
||||
database,
|
||||
logger,
|
||||
musicbrainz_albumid: str,
|
||||
release_group=None,
|
||||
fetch_furter: bool = True
|
||||
):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
"""
|
||||
release_group: ReleaseGroup
|
||||
tracks: list
|
||||
"""
|
||||
self.musicbrainz_albumid = musicbrainz_albumid
|
||||
self.release_group = release_group
|
||||
self.tracklist = []
|
||||
|
||||
result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid,
|
||||
includes=["recordings", "labels", "release-groups"])
|
||||
release_data = get_elem_from_obj(result, ['release'], return_if_none={})
|
||||
label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={})
|
||||
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
|
||||
release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={})
|
||||
if self.release_group is None:
|
||||
self.release_group = MetadataDownloader.ReleaseGroup(self.database, self.logger,
|
||||
musicbrainz_releasegroupid=get_elem_from_obj(
|
||||
release_group_data, ['id']), fetch_further=False)
|
||||
|
||||
self.title = get_elem_from_obj(release_data, ['title'])
|
||||
self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name'])
|
||||
|
||||
self.album_status = get_elem_from_obj(release_data, ['status'])
|
||||
self.language = get_elem_from_obj(release_data, ['text-representation', 'language'])
|
||||
self.year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0])
|
||||
self.date = get_elem_from_obj(release_data, ['date'])
|
||||
self.country = get_elem_from_obj(release_data, ['country'])
|
||||
self.barcode = get_elem_from_obj(release_data, ['barcode'])
|
||||
|
||||
self.save()
|
||||
if fetch_furter:
|
||||
self.append_recordings(recording_datas)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.title} ©{self.copyright} {self.album_status}"
|
||||
|
||||
def save(self):
|
||||
self.logger.info(f"caching release {self}")
|
||||
self.database.add_release(
|
||||
musicbrainz_albumid=self.musicbrainz_albumid,
|
||||
release_group_id=self.release_group.musicbrainz_releasegroupid,
|
||||
title=self.title,
|
||||
copyright_=self.copyright,
|
||||
album_status=self.album_status,
|
||||
language=self.language,
|
||||
year=self.year,
|
||||
date=self.date,
|
||||
country=self.country,
|
||||
barcode=self.barcode
|
||||
)
|
||||
|
||||
def append_recordings(self, recording_datas: dict):
|
||||
for i, recording_data in enumerate(recording_datas):
|
||||
musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['recording', 'id'])
|
||||
if musicbrainz_releasetrackid is None:
|
||||
continue
|
||||
|
||||
self.tracklist.append(
|
||||
MetadataDownloader.Track(self.database, self.logger, musicbrainz_releasetrackid, self,
|
||||
track_number=str(i + 1)))
|
||||
|
||||
class Track:
|
||||
def __init__(
|
||||
self,
|
||||
database,
|
||||
logger,
|
||||
musicbrainz_releasetrackid: str,
|
||||
release=None,
|
||||
track_number: str = None
|
||||
):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
"""
|
||||
release: Release
|
||||
feature_artists: list
|
||||
"""
|
||||
|
||||
self.musicbrainz_releasetrackid = musicbrainz_releasetrackid
|
||||
self.release = release
|
||||
self.artists = []
|
||||
|
||||
self.track_number = track_number
|
||||
|
||||
result = musicbrainzngs.get_recording_by_id(self.musicbrainz_releasetrackid,
|
||||
includes=["artists", "releases", "recording-rels", "isrcs",
|
||||
"work-level-rels"])
|
||||
recording_data = result['recording']
|
||||
release_data = get_elem_from_obj(recording_data, ['release-list', -1])
|
||||
if self.release is None:
|
||||
self.release = MetadataDownloader.Release(self.database, self.logger,
|
||||
get_elem_from_obj(release_data, ['id']), fetch_furter=False)
|
||||
|
||||
|
||||
for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]):
|
||||
self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id']))
|
||||
|
||||
self.isrc = get_elem_from_obj(recording_data, ['isrc-list', 0])
|
||||
self.title = recording_data['title']
|
||||
|
||||
self.save()
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.title}: {self.isrc}"
|
||||
|
||||
def save(self):
|
||||
self.logger.info(f"caching track {self}")
|
||||
|
||||
self.database.add_track(
|
||||
musicbrainz_releasetrackid=self.musicbrainz_releasetrackid,
|
||||
musicbrainz_albumid=self.release.musicbrainz_albumid,
|
||||
feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists],
|
||||
tracknumber=self.track_number,
|
||||
track=self.title,
|
||||
isrc=self.isrc
|
||||
)
|
||||
|
||||
def append_artist(self, artist_id: str):
|
||||
if artist_id is None:
|
||||
return
|
||||
|
||||
for existing_artist in self.artists:
|
||||
if artist_id == existing_artist.musicbrainz_artistid:
|
||||
return existing_artist
|
||||
new_artist = MetadataDownloader.Artist(self.database, self.logger, artist_id, new_release_groups=False)
|
||||
self.artists.append(new_artist)
|
||||
return new_artist
|
||||
|
||||
def download(self, option: dict):
|
||||
type_ = option['type']
|
||||
mb_id = option['id']
|
||||
|
||||
if type_ == "artist":
|
||||
self.Artist(self.database, self.logger, mb_id)
|
||||
elif type_ == "release_group":
|
||||
self.ReleaseGroup(self.database, self.logger, mb_id)
|
||||
elif type_ == "release":
|
||||
self.Release(self.database, self.logger, mb_id)
|
||||
elif type_ == "track":
|
||||
self.Track(self.database, self.logger, mb_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
temp_folder = "music-downloader"
|
||||
temp_dir = os.path.join(tempfile.gettempdir(), temp_folder)
|
||||
if not os.path.exists(temp_dir):
|
||||
os.mkdir(temp_dir)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
db_logger = logging.getLogger("database")
|
||||
db_logger.setLevel(logging.DEBUG)
|
||||
|
||||
import database
|
||||
|
||||
database_ = database.Database(os.path.join(temp_dir, "metadata.db"),
|
||||
os.path.join(temp_dir, "database_structure.sql"), db_logger,
|
||||
reset_anyways=True)
|
||||
|
||||
download_logger = logging.getLogger("metadata downloader")
|
||||
download_logger.setLevel(logging.INFO)
|
||||
|
||||
downloader = MetadataDownloader(database_, download_logger)
|
||||
|
||||
downloader.download({'id': 'd2006339-9e98-4624-a386-d503328eb854', 'type': 'track'})
|
||||
# downloader.download({'id': 'cdd16860-35fd-46af-bd8c-5de7b15ebc31', 'type': 'release'})
|
||||
# download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'})
|
||||
# download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'})
|
142
src/metadata/metadata.py
Normal file
142
src/metadata/metadata.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import logging
|
||||
import musicbrainzngs
|
||||
|
||||
from metadata import options
|
||||
|
||||
mb_log = logging.getLogger("musicbrainzngs")
|
||||
mb_log.setLevel(logging.WARNING)
|
||||
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
|
||||
|
||||
KNOWN_KIND_OF_OPTIONS = ["artist", "release", "track"]
|
||||
|
||||
|
||||
class Search:
|
||||
def __init__(self, query: str = None, artist: str = None, temp: str = "temp"):
|
||||
if query is None and artist is None:
|
||||
raise ValueError("no query provided")
|
||||
|
||||
self.options_history = []
|
||||
self.current_options = None
|
||||
self.current_chosen_option = None
|
||||
|
||||
self.temp = temp
|
||||
|
||||
# initial search
|
||||
if query is not None:
|
||||
self.set_options(
|
||||
options.Options([musicbrainzngs.search_artists(query), musicbrainzngs.search_releases(query),
|
||||
musicbrainzngs.search_recordings(query)]))
|
||||
elif artist is not None:
|
||||
self.set_options(options.Options([musicbrainzngs.search_artists(artist=artist)]))
|
||||
|
||||
def browse_artist(self, artist: dict, limit: int = 25):
|
||||
options_sets = [
|
||||
{"artist-list": [artist, ], "artist-count": 1},
|
||||
musicbrainzngs.browse_releases(artist=artist["id"], limit=limit),
|
||||
musicbrainzngs.browse_recordings(artist=artist["id"], limit=limit)
|
||||
]
|
||||
return self.set_options(options.Options(options_sets))
|
||||
|
||||
def browse_release(self, release: dict, limit: int = 25):
|
||||
options_sets = [
|
||||
musicbrainzngs.browse_artists(release=release["id"], limit=limit),
|
||||
{"release-list": [release, ], "release-count": 1},
|
||||
musicbrainzngs.browse_recordings(release=release["id"], limit=limit)
|
||||
]
|
||||
return self.set_options(options.Options(options_sets))
|
||||
|
||||
def browse_track(self, track: dict, limit: int = 25):
|
||||
options_sets = [
|
||||
musicbrainzngs.browse_artists(recording=track["id"], limit=limit),
|
||||
musicbrainzngs.browse_releases(recording=track["id"], limit=limit),
|
||||
{"recording-list": [track, ], "recording-count": 1}
|
||||
]
|
||||
return self.set_options(options.Options(options_sets))
|
||||
|
||||
def choose(self, index, limit: int = 25, ignore_limit_for_tracklist: bool = True):
|
||||
if not self.current_options.choose(index):
|
||||
return self.current_options
|
||||
|
||||
self.current_chosen_option = self.current_options.get_current_option(komplex=True)
|
||||
kind = self.current_chosen_option['type']
|
||||
if kind == 'artist':
|
||||
return self.browse_artist(self.current_chosen_option, limit=limit)
|
||||
if kind == 'release':
|
||||
release_limit = limit if not ignore_limit_for_tracklist else 100
|
||||
release_limit = 100
|
||||
return self.browse_release(self.current_chosen_option, limit=release_limit)
|
||||
if kind == 'track':
|
||||
track_limit = limit if not ignore_limit_for_tracklist else 100
|
||||
return self.browse_track(self.current_chosen_option, limit=track_limit)
|
||||
|
||||
return self.current_options
|
||||
|
||||
def get_options(self):
|
||||
return self.current_options
|
||||
|
||||
def set_options(self, option_instance):
|
||||
self.options_history.append(option_instance)
|
||||
self.current_options = option_instance
|
||||
|
||||
return option_instance
|
||||
|
||||
def get_previous_options(self):
|
||||
self.options_history.pop(-1)
|
||||
self.current_options = self.options_history[-1]
|
||||
return self.current_options
|
||||
|
||||
options = property(fget=get_options)
|
||||
|
||||
|
||||
def automated_demo():
|
||||
search = Search(query="psychonaut 4")
|
||||
print(search.options)
|
||||
print(search.choose(0))
|
||||
search.download()
|
||||
print(search.choose(2))
|
||||
search.download()
|
||||
print(search.choose(4))
|
||||
print(search.download())
|
||||
|
||||
|
||||
def interactive_demo():
|
||||
search = Search(query=input("initial query: "))
|
||||
print(search.options)
|
||||
while True:
|
||||
input_ = input(
|
||||
"d to download, q to quit, .. for previous options, . for current options, int for this element: ").lower()
|
||||
input_.strip()
|
||||
if input_ == "q":
|
||||
break
|
||||
if input_ == ".":
|
||||
print(search.options)
|
||||
continue
|
||||
if input_ == "..":
|
||||
print(search.get_previous_options())
|
||||
continue
|
||||
if input_.isdigit():
|
||||
print(search.choose(int(input_)))
|
||||
continue
|
||||
if input_ == "d":
|
||||
search.download()
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# interactive_demo()
|
||||
# automated_demo()
|
||||
search = Search(query="psychonaut 4")
|
||||
# search.download_release("27f00fb8-983c-4d5c-950f-51418aac55dc")
|
||||
search.download_release("1aeb676f-e556-4b17-b45e-64ab69ef0375")
|
||||
# for track_ in search.download_artist("c0c720b5-012f-4204-a472-981403f37b12"):
|
||||
# print(track_)
|
||||
# res = search.download_track("83a30323-aee1-401a-b767-b3c1bdd026c0")
|
||||
# res = search.download_track("5e1ee2c5-502c-44d3-b1bc-22803441d8c6")
|
||||
res = search.download_track("86b43bec-eea6-40ae-8624-c1e404204ba1")
|
||||
# res = search.download_track("5cc28584-10c6-40e2-b6d4-6891e7e7c575")
|
||||
|
||||
for key in res[0]:
|
||||
if res[0][key] is None:
|
||||
continue
|
||||
|
||||
print(key, res[0][key])
|
24
src/metadata/object_handeling.py
Normal file
24
src/metadata/object_handeling.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from datetime import date
|
||||
|
||||
|
||||
def get_elem_from_obj(current_object, keys: list, after_process=lambda x: x, return_if_none=None):
|
||||
current_object = current_object
|
||||
for key in keys:
|
||||
if key in current_object or (type(key) == int and key < len(current_object)):
|
||||
current_object = current_object[key]
|
||||
else:
|
||||
return return_if_none
|
||||
return after_process(current_object)
|
||||
|
||||
|
||||
def parse_music_brainz_date(mb_date: str) -> date:
|
||||
year = 1
|
||||
month = 1
|
||||
day = 1
|
||||
|
||||
first_release_date = mb_date
|
||||
if first_release_date.count("-") == 2:
|
||||
year, month, day = [int(i) for i in first_release_date.split("-")]
|
||||
elif first_release_date.count("-") == 0 and first_release_date.isdigit():
|
||||
year = int(first_release_date)
|
||||
return date(year, month, day)
|
118
src/metadata/options.py
Normal file
118
src/metadata/options.py
Normal file
File diff suppressed because one or more lines are too long
370
src/metadata/search.py
Normal file
370
src/metadata/search.py
Normal file
@@ -0,0 +1,370 @@
|
||||
from typing import List
|
||||
import logging
|
||||
import musicbrainzngs
|
||||
|
||||
try:
|
||||
from object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
except ModuleNotFoundError:
|
||||
from metadata.object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
mb_log = logging.getLogger("musicbrainzngs")
|
||||
mb_log.setLevel(logging.WARNING)
|
||||
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
|
||||
|
||||
MAX_PARAMATERS = 3
|
||||
OPTION_TYPES = ['artist', 'release_group', 'release', 'recording']
|
||||
|
||||
|
||||
class Option:
|
||||
def __init__(self, type_: str, id_: str, name: str, additional_info: str = "") -> None:
|
||||
if type_ not in OPTION_TYPES:
|
||||
raise ValueError(f"type: {type_} doesn't exist. Leagal Values: {OPTION_TYPES}")
|
||||
self.type = type_
|
||||
self.name = name
|
||||
self.id = id_
|
||||
|
||||
self.additional_info = additional_info
|
||||
|
||||
def __getitem__(self, item):
|
||||
map_ = {
|
||||
"id": self.id,
|
||||
"type": self.type,
|
||||
"kind": self.type,
|
||||
"name": self.name
|
||||
}
|
||||
return map_[item]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
type_repr = {
|
||||
'artist': 'artist\t\t',
|
||||
'release_group': 'release group\t',
|
||||
'release': 'release\t\t',
|
||||
'recording': 'recording\t'
|
||||
}
|
||||
return f"{type_repr[self.type]}: \"{self.name}\"{self.additional_info}"
|
||||
|
||||
class MultipleOptions:
|
||||
def __init__(self, option_list: List[Option]) -> None:
|
||||
self.option_list = option_list
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "\n".join([f"{str(i).zfill(2)}) {choice.__repr__()}" for i, choice in enumerate(self.option_list)])
|
||||
|
||||
|
||||
class Search:
|
||||
def __init__(self, logger: logging.Logger) -> None:
|
||||
self.logger = logger
|
||||
|
||||
self.options_history = []
|
||||
self.current_option: Option
|
||||
|
||||
def append_new_choices(self, new_choices: List[Option]) -> MultipleOptions:
|
||||
self.options_history.append(new_choices)
|
||||
return MultipleOptions(new_choices)
|
||||
|
||||
def get_previous_options(self):
|
||||
self.options_history.pop(-1)
|
||||
return MultipleOptions(self.options_history[-1])
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_artist(artist: Option):
|
||||
"""
|
||||
returning list of artist and every release group
|
||||
"""
|
||||
result = musicbrainzngs.get_artist_by_id(artist.id, includes=["release-groups", "releases"])
|
||||
artist_data = get_elem_from_obj(result, ['artist'], return_if_none={})
|
||||
|
||||
result = [artist]
|
||||
|
||||
# sort all release groups by date and add album sort to have them in chronological order.
|
||||
release_group_list = artist_data['release-group-list']
|
||||
for i, release_group in enumerate(release_group_list):
|
||||
release_group_list[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
|
||||
release_group_list.sort(key=lambda x: x['first-release-date'])
|
||||
release_group_list = [Option("release_group", get_elem_from_obj(release_group_, ['id']),
|
||||
get_elem_from_obj(release_group_, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_group_, ['type'])}) from {get_elem_from_obj(release_group_, ['first-release-date'])}")
|
||||
for release_group_ in release_group_list]
|
||||
|
||||
result.extend(release_group_list)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_release_group(release_group: Option):
|
||||
"""
|
||||
returning list including the artists, the releases and the tracklist of the first release
|
||||
"""
|
||||
results = []
|
||||
|
||||
result = musicbrainzngs.get_release_group_by_id(release_group.id,
|
||||
includes=["artist-credits", "releases"])
|
||||
release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={})
|
||||
release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending initial release group
|
||||
results.append(release_group)
|
||||
|
||||
# appending all releases
|
||||
first_release = None
|
||||
for i, release_data in enumerate(release_datas):
|
||||
results.append(
|
||||
Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_data, ['status'])})"))
|
||||
if i == 0:
|
||||
first_release = results[-1]
|
||||
|
||||
# append tracklist of first release
|
||||
if first_release is not None:
|
||||
results.extend(Search.fetch_new_options_from_release(first_release, only_tracklist=True))
|
||||
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_release(release: Option, only_tracklist: bool = False):
|
||||
"""
|
||||
artists
|
||||
release group
|
||||
release
|
||||
tracklist
|
||||
"""
|
||||
results = []
|
||||
result = musicbrainzngs.get_release_by_id(release.id,
|
||||
includes=["recordings", "labels", "release-groups", "artist-credits"])
|
||||
release_data = get_elem_from_obj(result, ['release'], return_if_none={})
|
||||
label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={})
|
||||
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
|
||||
release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_data, ['artist-credit'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending the according release group
|
||||
results.append(Option("release_group", get_elem_from_obj(release_group_data, ['id']),
|
||||
get_elem_from_obj(release_group_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_group_data, ['type'])}) from {get_elem_from_obj(release_group_data, ['first-release-date'])}"))
|
||||
|
||||
# appending the release
|
||||
results.append(release)
|
||||
|
||||
# appending the tracklist, but first putting it in a list, in case of only_tracklist being True to
|
||||
# return this instead
|
||||
tracklist = []
|
||||
for i, recording_data in enumerate(recording_datas):
|
||||
recording_data = recording_data['recording']
|
||||
tracklist.append(Option('recording', get_elem_from_obj(recording_data, ['id']),
|
||||
get_elem_from_obj(recording_data, ['title']),
|
||||
f" ({get_elem_from_obj(recording_data, ['length'])}) from {get_elem_from_obj(recording_data, ['artist-credit-phrase'])}"))
|
||||
|
||||
if only_tracklist:
|
||||
return tracklist
|
||||
results.extend(tracklist)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_record(recording: Option):
|
||||
"""
|
||||
artists, release, record
|
||||
"""
|
||||
results = []
|
||||
|
||||
result = musicbrainzngs.get_recording_by_id(recording.id, includes=["artists", "releases"])
|
||||
recording_data = result['recording']
|
||||
release_datas = get_elem_from_obj(recording_data, ['release-list'])
|
||||
artist_datas = get_elem_from_obj(recording_data, ['artist-credit'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending all releases
|
||||
for i, release_data in enumerate(release_datas):
|
||||
results.append(
|
||||
Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_data, ['status'])})"))
|
||||
|
||||
results.append(recording)
|
||||
|
||||
return results
|
||||
|
||||
def fetch_new_options(self) -> MultipleOptions:
|
||||
if self.current_option is None:
|
||||
return -1
|
||||
|
||||
result = []
|
||||
if self.current_option.type == 'artist':
|
||||
result = self.fetch_new_options_from_artist(self.current_option)
|
||||
elif self.current_option.type == 'release_group':
|
||||
result = self.fetch_new_options_from_release_group(self.current_option)
|
||||
elif self.current_option.type == 'release':
|
||||
result = self.fetch_new_options_from_release(self.current_option)
|
||||
elif self.current_option.type == 'recording':
|
||||
result = self.fetch_new_options_from_record(self.current_option)
|
||||
|
||||
return self.append_new_choices(result)
|
||||
|
||||
def choose(self, index: int) -> MultipleOptions:
|
||||
if len(self.options_history) == 0:
|
||||
logging.error("initial query neaded before choosing")
|
||||
return MultipleOptions([])
|
||||
|
||||
latest_options = self.options_history[-1]
|
||||
if index >= len(latest_options):
|
||||
logging.error("index outside of options")
|
||||
return MultipleOptions([])
|
||||
|
||||
self.current_option = latest_options[index]
|
||||
return self.fetch_new_options()
|
||||
|
||||
@staticmethod
|
||||
def search_recording_from_text(artist: str = None, release_group: str = None, recording: str = None, query: str = None):
|
||||
result = musicbrainzngs.search_recordings(artist=artist, release=release_group, recording=recording, query=query)
|
||||
recording_list = get_elem_from_obj(result, ['recording-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [
|
||||
Option("recording", get_elem_from_obj(recording_, ['id']), get_elem_from_obj(recording_, ['title']),
|
||||
additional_info=f" of {get_elem_from_obj(recording_, ['release-list', 0, 'title'])} by {get_elem_from_obj(recording_, ['artist-credit', 0, 'name'])}")
|
||||
for recording_ in recording_list]
|
||||
return resulting_options
|
||||
|
||||
@staticmethod
|
||||
def search_release_group_from_text(artist: str = None, release_group: str = None, query: str = None):
|
||||
result = musicbrainzngs.search_release_groups(artist=artist, releasegroup=release_group, query=query)
|
||||
release_group_list = get_elem_from_obj(result, ['release-group-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [Option("release_group", get_elem_from_obj(release_group_, ['id']),
|
||||
get_elem_from_obj(release_group_, ['title']),
|
||||
additional_info=f" by {get_elem_from_obj(release_group_, ['artist-credit', 0, 'name'])}")
|
||||
for release_group_ in release_group_list]
|
||||
return resulting_options
|
||||
|
||||
@staticmethod
|
||||
def search_artist_from_text(artist: str = None, query: str = None):
|
||||
result = musicbrainzngs.search_artists(artist=artist, query=query)
|
||||
artist_list = get_elem_from_obj(result, ['artist-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [Option("artist", get_elem_from_obj(artist_, ['id']), get_elem_from_obj(artist_, ['name']),
|
||||
additional_info=f": {', '.join([i['name'] for i in get_elem_from_obj(artist_, ['tag-list'], return_if_none=[])])}")
|
||||
for artist_ in artist_list]
|
||||
return resulting_options
|
||||
|
||||
def search_from_text(self, artist: str = None, release_group: str = None, recording: str = None) -> MultipleOptions:
|
||||
self.logger.info(f"searching specified artist: \"{artist}\", release group: \"{release_group}\", recording: \"{recording}\"")
|
||||
if artist is None and release_group is None and recording is None:
|
||||
self.logger.error("either artist, release group or recording has to be set")
|
||||
return -1
|
||||
|
||||
if recording is not None:
|
||||
self.logger.info("search for recording")
|
||||
results = self.search_recording_from_text(artist=artist, release_group=release_group, recording=recording)
|
||||
elif release_group is not None:
|
||||
self.logger.info("search for release group")
|
||||
results = self.search_release_group_from_text(artist=artist, release_group=release_group)
|
||||
else:
|
||||
self.logger.info("search for artist")
|
||||
results = self.search_artist_from_text(artist=artist)
|
||||
|
||||
return self.append_new_choices(results)
|
||||
|
||||
def search_from_text_unspecified(self, query: str) -> MultipleOptions:
|
||||
self.logger.info(f"searching unspecified: \"{query}\"")
|
||||
|
||||
results = []
|
||||
results.extend(self.search_artist_from_text(query=query))
|
||||
results.extend(self.search_release_group_from_text(query=query))
|
||||
results.extend(self.search_recording_from_text(query=query))
|
||||
|
||||
return self.append_new_choices(results)
|
||||
|
||||
def search_from_query(self, query: str) -> MultipleOptions:
|
||||
if query is None:
|
||||
return MultipleOptions([])
|
||||
"""
|
||||
mit # wird ein neuer Parameter gestartet
|
||||
der Buchstabe dahinter legt die Art des Parameters fest
|
||||
"#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop"
|
||||
if no # is in the query it gets treated as "unspecified query"
|
||||
:param query:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if not '#' in query:
|
||||
return self.search_from_text_unspecified(query)
|
||||
|
||||
artist = None
|
||||
release_group = None
|
||||
recording = None
|
||||
|
||||
query = query.strip()
|
||||
parameters = query.split('#')
|
||||
parameters.remove('')
|
||||
|
||||
if len(parameters) > MAX_PARAMATERS:
|
||||
raise ValueError(f"too many parameters. Only {MAX_PARAMATERS} are allowed")
|
||||
|
||||
for parameter in parameters:
|
||||
splitted = parameter.split(" ")
|
||||
type_ = splitted[0]
|
||||
input_ = " ".join(splitted[1:]).strip()
|
||||
|
||||
if type_ == "a":
|
||||
artist = input_
|
||||
continue
|
||||
if type_ == "r":
|
||||
release_group = input_
|
||||
continue
|
||||
if type_ == "t":
|
||||
recording = input_
|
||||
continue
|
||||
|
||||
return self.search_from_text(artist=artist, release_group=release_group, recording=recording)
|
||||
|
||||
|
||||
def automated_demo():
|
||||
search = Search(logger=logger_)
|
||||
search.search_from_text(artist="I Prevail")
|
||||
|
||||
# choose an artist
|
||||
search.choose(0)
|
||||
# choose a release group
|
||||
search.choose(9)
|
||||
# choose a release
|
||||
search.choose(2)
|
||||
# choose a recording
|
||||
search.choose(4)
|
||||
|
||||
|
||||
def interactive_demo():
|
||||
search = Search(logger=logger_)
|
||||
while True:
|
||||
input_ = input("q to quit, .. for previous options, int for this element, str to search for query, ok to download: ")
|
||||
input_.strip()
|
||||
if input_.lower() == "ok":
|
||||
break
|
||||
if input_.lower() == "q":
|
||||
break
|
||||
if input_.lower() == "..":
|
||||
search.get_previous_options()
|
||||
continue
|
||||
if input_.isdigit():
|
||||
search.choose(int(input_))
|
||||
continue
|
||||
search.search_from_query(input_)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger_ = logging.getLogger("test")
|
||||
|
||||
interactive_demo()
|
||||
|
@@ -1,8 +1,16 @@
|
||||
import logging
|
||||
import time
|
||||
|
||||
import requests
|
||||
import bs4
|
||||
|
||||
import phonetic_compares
|
||||
try:
|
||||
import phonetic_compares
|
||||
except ModuleNotFoundError:
|
||||
from scraping import phonetic_compares
|
||||
|
||||
TRIES = 5
|
||||
TIMEOUT = 10
|
||||
|
||||
session = requests.Session()
|
||||
session.headers = {
|
||||
@@ -12,8 +20,8 @@ session.headers = {
|
||||
|
||||
|
||||
def get_musify_url(row):
|
||||
title = row.title
|
||||
artists = row.artist
|
||||
title = row['title']
|
||||
artists = row['artists']
|
||||
|
||||
url = f"https://musify.club/search/suggestions?term={artists[0]} - {title}"
|
||||
|
||||
@@ -40,7 +48,10 @@ def get_download_link(default_url):
|
||||
|
||||
def download_from_musify(file, url):
|
||||
logging.info(f"downloading: '{url}'")
|
||||
r = session.get(url)
|
||||
try:
|
||||
r = session.get(url, timeout=15)
|
||||
except requests.exceptions.ConnectionError:
|
||||
return -1
|
||||
if r.status_code != 200:
|
||||
if r.status_code == 404:
|
||||
logging.warning(f"{r.url} was not found")
|
||||
@@ -60,18 +71,25 @@ def download(row):
|
||||
return download_from_musify(file_, url)
|
||||
|
||||
|
||||
def get_soup_of_search(query: str):
|
||||
def get_soup_of_search(query: str, trie=0):
|
||||
url = f"https://musify.club/search?searchText={query}"
|
||||
logging.debug(f"Trying to get soup from {url}")
|
||||
r = session.get(url)
|
||||
if r.status_code != 200:
|
||||
if r.status_code in [503] and trie < TRIES:
|
||||
logging.warning(f"youtube blocked downloading. ({trie}-{TRIES})")
|
||||
logging.warning(f"retrying in {TIMEOUT} seconds again")
|
||||
time.sleep(TIMEOUT)
|
||||
return get_soup_of_search(query, trie=trie+1)
|
||||
|
||||
logging.warning("too many tries, returning")
|
||||
raise ConnectionError(f"{r.url} returned {r.status_code}:\n{r.content}")
|
||||
return bs4.BeautifulSoup(r.content, features="html.parser")
|
||||
|
||||
|
||||
def search_for_track(row):
|
||||
track = row.title
|
||||
artist = row.artist
|
||||
track = row['title']
|
||||
artist = row['artists']
|
||||
|
||||
soup = get_soup_of_search(f"{artist[0]} - {track}")
|
||||
tracklist_container_soup = soup.find_all("div", {"class": "playlist"})
|
@@ -3,7 +3,10 @@ import pandas as pd
|
||||
import logging
|
||||
import time
|
||||
|
||||
import phonetic_compares
|
||||
try:
|
||||
import phonetic_compares
|
||||
except ModuleNotFoundError:
|
||||
from scraping import phonetic_compares
|
||||
|
||||
YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
|
||||
YOUTUBE_URL_KEY = 'webpage_url'
|
@@ -1,30 +1,17 @@
|
||||
import os.path
|
||||
import shlex
|
||||
import pandas as pd
|
||||
import json
|
||||
import logging
|
||||
|
||||
|
||||
class UrlPath:
|
||||
def __init__(self, genre: str, temp: str = "temp", file: str = ".cache3.csv", step_two_file: str = ".cache2.csv"):
|
||||
self.temp = temp
|
||||
self.file = file
|
||||
self.metadata = pd.read_csv(os.path.join(self.temp, step_two_file), index_col=0)
|
||||
def __init__(self, database, logger: logging.Logger, genre: str):
|
||||
self.database = database
|
||||
self.logger = logger
|
||||
|
||||
self.genre = genre
|
||||
|
||||
new_metadata = []
|
||||
|
||||
for idx, row in self.metadata.iterrows():
|
||||
for row in self.database.get_tracks_without_filepath():
|
||||
file, path = self.get_path_from_row(row)
|
||||
new_row = dict(row)
|
||||
new_row['path'] = path
|
||||
new_row['file'] = file
|
||||
new_row['genre'] = self.genre
|
||||
new_metadata.append(new_row)
|
||||
|
||||
new_df = pd.DataFrame(new_metadata)
|
||||
new_df.to_csv(os.path.join(self.temp, self.file))
|
||||
|
||||
self.database.set_filepath(row['id'], file, path, genre)
|
||||
|
||||
def get_path_from_row(self, row):
|
||||
"""
|
||||
@@ -33,7 +20,9 @@ class UrlPath:
|
||||
:param row:
|
||||
:return: path:
|
||||
"""
|
||||
return os.path.join(self.get_genre(), self.get_artist(row), self.get_album(row), f"{self.get_song(row)}.mp3"), os.path.join(self.get_genre(), self.get_artist(row), self.get_album(row))
|
||||
return os.path.join(self.get_genre(), self.get_artist(row), self.get_album(row),
|
||||
f"{self.get_song(row)}.mp3"), os.path.join(self.get_genre(), self.get_artist(row),
|
||||
self.get_album(row))
|
||||
|
||||
def escape_part(self, part: str):
|
||||
return part.replace("/", " ")
|
||||
@@ -45,7 +34,7 @@ class UrlPath:
|
||||
return self.escape_part(row['album'])
|
||||
|
||||
def get_artist(self, row):
|
||||
artists = json.loads(row['artist'].replace("'", '"'))
|
||||
artists = [artist['name'] for artist in row['artists']]
|
||||
return self.escape_part(artists[0])
|
||||
|
||||
def get_song(self, row):
|
||||
|
Reference in New Issue
Block a user