big refactoring

This commit is contained in:
lars 2022-11-12 00:29:07 +01:00
parent 288e7a9b7b
commit f7129e796e
23 changed files with 239 additions and 492 deletions

View File

@ -1,4 +1,5 @@
# TO DO # TO DO
- refactor file system
- genuis scraper only downloades partially lyrics: reproducable example https://genius.com/Zombiez-blut-lyrics - genuis scraper only downloades partially lyrics: reproducable example https://genius.com/Zombiez-blut-lyrics
- LYRICS - LYRICS
- add complete search of musify (scraping of artist page etc.) as last resort - add complete search of musify (scraping of artist page etc.) as last resort

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
__name__ = "music downloader"

View File

@ -1,4 +1,5 @@
from metadata.database import Database from utils.shared import *
from metadata.download import MetadataDownloader from metadata.download import MetadataDownloader
import metadata.download import metadata.download
import metadata.search import metadata.search
@ -11,28 +12,6 @@ from lyrics_ import fetch_lyrics
import logging import logging
import os import os
import tempfile
TEMP_FOLDER = "music-downloader"
LOG_FILE = "download_logs.log"
DATABASE_FILE = "metadata.db"
DATABASE_STRUCTURE_FILE = "database_structure.sql"
DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/new_metadata/assets/database_structure.sql"
SEARCH_LOGGER = logging.getLogger("mb-cli")
DATABASE_LOGGER = logging.getLogger("database")
METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata-download")
URL_DOWNLOAD_LOGGER = logging.getLogger("ling-download")
PATH_LOGGER = logging.getLogger("create-paths")
DOWNLOAD_LOGGER = logging.getLogger("download")
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.expanduser('~/Music')
TOR = False
temp_dir = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
# configure logger default # configure logger default
logging.basicConfig( logging.basicConfig(
@ -44,12 +23,6 @@ logging.basicConfig(
] ]
) )
database = Database(os.path.join(temp_dir, DATABASE_FILE),
os.path.join(temp_dir, DATABASE_STRUCTURE_FILE),
DATABASE_STRUCTURE_FALLBACK,
DATABASE_LOGGER,
reset_anyways=True)
def get_existing_genre(): def get_existing_genre():
valid_directories = [] valid_directories = []
@ -61,7 +34,7 @@ def get_existing_genre():
def search_for_metadata(): def search_for_metadata():
search = metadata.search.Search(logger=SEARCH_LOGGER) search = metadata.search.Search()
while True: while True:
input_ = input( input_ = input(
@ -105,13 +78,6 @@ def get_genre():
def cli(start_at: int = 0): def cli(start_at: int = 0):
proxies = None
if TOR:
proxies = {
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
}
if start_at <= 2: if start_at <= 2:
genre = get_genre() genre = get_genre()
logging.info(f"{genre} has been set as genre.") logging.info(f"{genre} has been set as genre.")
@ -120,24 +86,24 @@ def cli(start_at: int = 0):
search = search_for_metadata() search = search_for_metadata()
# search = metadata.search.Option("release", "f8d4b24d-2c46-4e9c-8078-0c0f337c84dd", "Beautyfall") # search = metadata.search.Option("release", "f8d4b24d-2c46-4e9c-8078-0c0f337c84dd", "Beautyfall")
logging.info("Starting Downloading of metadata") logging.info("Starting Downloading of metadata")
metadata_downloader = MetadataDownloader(database, METADATA_DOWNLOAD_LOGGER) metadata_downloader = MetadataDownloader()
metadata_downloader.download(search) metadata_downloader.download({'type': search.type, 'id': search.id})
if start_at <= 1: if start_at <= 1:
logging.info("creating Paths") logging.info("creating Paths")
url_to_path.UrlPath(database, PATH_LOGGER, genre=genre) url_to_path.UrlPath(genre=genre)
if start_at <= 2: if start_at <= 2:
logging.info("Fetching Download Links") logging.info("Fetching Download Links")
download_links.Download(database, METADATA_DOWNLOAD_LOGGER, MUSIC_DIR, proxies=proxies) download_links.Download()
if start_at <= 3: if start_at <= 3:
logging.info("starting to download the mp3's") logging.info("starting to download the mp3's")
download.Download(database, DOWNLOAD_LOGGER, proxies=proxies, base_path=MUSIC_DIR) download.Download()
if start_at <= 4: if start_at <= 4:
logging.info("starting to fetch the lyrics") logging.info("starting to fetch the lyrics")
fetch_lyrics(database) fetch_lyrics()
if __name__ == "__main__": if __name__ == "__main__":

0
src/audio/__init__.py Normal file
View File

3
src/audio/song.py Normal file
View File

@ -0,0 +1,3 @@
class Song:
def __init__(self, path: str):
pass

View File

@ -3,9 +3,9 @@ import requests
import os.path import os.path
from mutagen.easyid3 import EasyID3 from mutagen.easyid3 import EasyID3
from pydub import AudioSegment from pydub import AudioSegment
import logging
from scraping import musify, youtube_music from src.utils.shared import *
from src.scraping import musify, youtube_music
""" """
https://en.wikipedia.org/wiki/ID3 https://en.wikipedia.org/wiki/ID3
@ -17,19 +17,15 @@ print("\n".join(EasyID3.valid_keys.keys()))
print(EasyID3.valid_keys.keys()) print(EasyID3.valid_keys.keys())
""" """
logger = DOWNLOAD_LOGGER
class Download: class Download:
def __init__(self, database, logger: logging.Logger, proxies: dict = None, base_path: str = ""): def __init__(self):
if proxies is not None:
musify.set_proxy(proxies)
self.database = database
self.logger = logger
for row in database.get_tracks_to_download(): for row in database.get_tracks_to_download():
row['artist'] = [i['name'] for i in row['artists']] row['artist'] = [i['name'] for i in row['artists']]
row['file'] = os.path.join(base_path, row['file']) row['file'] = os.path.join(MUSIC_DIR, row['file'])
row['path'] = os.path.join(base_path, row['path']) row['path'] = os.path.join(MUSIC_DIR, row['path'])
if self.path_stuff(row['path'], row['file']): if self.path_stuff(row['path'], row['file']):
self.write_metadata(row, row['file']) self.write_metadata(row, row['file'])
@ -43,14 +39,15 @@ class Download:
download_success = youtube_music.download(row) download_success = youtube_music.download(row)
if download_success == -1: if download_success == -1:
self.logger.warning(f"couldn't download {row['url']} from {row['src']}") logger.warning(f"couldn't download {row['url']} from {row['src']}")
continue continue
self.write_metadata(row, row['file']) self.write_metadata(row, row['file'])
def write_metadata(self, row, file_path): @staticmethod
def write_metadata(row, file_path):
if not os.path.exists(file_path): if not os.path.exists(file_path):
self.logger.warning("something went really wrong") logger.warning("something went really wrong")
return False return False
# only convert the file to the proper format if mutagen doesn't work with it due to time # only convert the file to the proper format if mutagen doesn't work with it due to time
@ -68,13 +65,14 @@ class Download:
row[key] = str(row[key]) row[key] = str(row[key])
audiofile[key] = row[key] audiofile[key] = row[key]
self.logger.info("saving") logger.info("saving")
audiofile.save(file_path, v1=2) audiofile.save(file_path, v1=2)
def path_stuff(self, path: str, file_: str): @staticmethod
def path_stuff(path: str, file_: str):
# returns true if it shouldn't be downloaded # returns true if it shouldn't be downloaded
if os.path.exists(file_): if os.path.exists(file_):
self.logger.info(f"'{file_}' does already exist, thus not downloading.") logger.info(f"'{file_}' does already exist, thus not downloading.")
return True return True
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
return False return False
@ -83,4 +81,4 @@ class Download:
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
s = requests.Session() s = requests.Session()
Download(session=s, base_path=os.path.expanduser('~/Music')) Download()

View File

@ -1,26 +1,27 @@
import requests import requests
import os
import logging
from scraping import musify, youtube_music from src.utils.shared import *
from src.scraping import musify, youtube_music, file_system
logger = URL_DOWNLOAD_LOGGER
class Download: class Download:
def __init__(self, database, logger: logging.Logger, music_dir: str, proxies: dict = None) -> None: def __init__(self) -> None:
self.music_dir = music_dir
self.database = database
self.logger = logger
if proxies is not None:
musify.set_proxy(proxies)
self.urls = [] self.urls = []
for row in self.database.get_tracks_without_src(): for row in database.get_tracks_without_src():
row['artists'] = [artist['name'] for artist in row['artists']] row['artists'] = [artist['name'] for artist in row['artists']]
id_ = row['id'] id_ = row['id']
if os.path.exists(os.path.join(self.music_dir, row['file'])): if os.path.exists(os.path.join(MUSIC_DIR, row['file'])):
self.logger.info(f"skipping the fetching of the download links, cuz {row['file']} already exists.") logger.info(f"skipping the fetching of the download links, cuz {row['file']} already exists.")
continue
# check File System
file_path = file_system.get_path(row)
if file_path is not None:
self.add_url(file_path, 'file', id_)
continue continue
# check YouTube # check YouTube
@ -41,18 +42,11 @@ class Download:
self.add_url(musify_url, 'musify', id_) self.add_url(musify_url, 'musify', id_)
continue continue
self.logger.warning(f"Didn't find any sources for {row['title']}") logger.warning(f"Didn't find any sources for {row['title']}")
def add_url(self, url: str, src: str, id_: str): def add_url(self, url: str, src: str, id_: str):
self.database.set_download_data(id_, url, src) database.set_download_data(id_, url, src)
if __name__ == "__main__": if __name__ == "__main__":
proxies = {
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
}
s = requests.Session()
s.proxies = proxies
download = Download() download = Download()

0
src/lyrics/__init__.py Normal file
View File

View File

@ -1,17 +1,11 @@
import requests import requests
import sys
import os
import logging
from typing import List from typing import List
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import pycountry import pycountry
current = os.path.dirname(os.path.realpath(__file__)) from src.utils.shared import *
parent = os.path.dirname(current) from src.utils import phonetic_compares
sys.path.append(parent) from src.utils.object_handeling import get_elem_from_obj
# utils >:3
from tools import phonetic_compares
from tools.object_handeling import get_elem_from_obj
# search doesn't support isrc # search doesn't support isrc
# https://genius.com/api/search/multi?q=I Prevail - Breaking Down # https://genius.com/api/search/multi?q=I Prevail - Breaking Down
@ -23,16 +17,9 @@ session.headers = {
"Connection": "keep-alive", "Connection": "keep-alive",
"Referer": "https://genius.com/search/embed" "Referer": "https://genius.com/search/embed"
} }
logger = logging.getLogger("genius")
def set_proxy(proxies: dict):
session.proxies = proxies session.proxies = proxies
logger = logging.getLogger("genius")
def set_logger(logger_: logging.Logger):
global logger
logger = logger_
class Song: class Song:

0
src/metadata/__init__.py Normal file
View File

View File

@ -1,41 +1,31 @@
from src.utils.shared import *
from src.utils.object_handeling import get_elem_from_obj
from typing import List from typing import List
import musicbrainzngs import musicbrainzngs
import logging import logging
try:
from object_handeling import get_elem_from_obj, parse_music_brainz_date
except ModuleNotFoundError:
from metadata.object_handeling import get_elem_from_obj, parse_music_brainz_date
# I don't know if it would be feesable to set up my own mb instance # I don't know if it would be feesable to set up my own mb instance
# https://github.com/metabrainz/musicbrainz-docker # https://github.com/metabrainz/musicbrainz-docker
mb_log = logging.getLogger("musicbrainzngs")
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
# IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES # IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES
# https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data # https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data
logger = METADATA_DOWNLOAD_LOGGER
class MetadataDownloader: class MetadataDownloader:
def __init__(self, database, logger: logging.Logger): def __init__(self):
self.database = database pass
self.logger = logger
class Artist: class Artist:
def __init__( def __init__(
self, self,
database,
logger,
musicbrainz_artistid: str, musicbrainz_artistid: str,
release_groups: List = [], release_groups: List = [],
new_release_groups: bool = True new_release_groups: bool = True
): ):
self.database = database
self.logger = logger
""" """
release_groups: list release_groups: list
""" """
@ -61,8 +51,8 @@ class MetadataDownloader:
for i, release_group in enumerate(release_groups): for i, release_group in enumerate(release_groups):
self.release_groups.append(MetadataDownloader.ReleaseGroup( self.release_groups.append(MetadataDownloader.ReleaseGroup(
self.database, database,
self.logger, logger,
musicbrainz_releasegroupid=release_group['id'], musicbrainz_releasegroupid=release_group['id'],
artists=[self], artists=[self],
albumsort=i + 1 albumsort=i + 1
@ -73,8 +63,8 @@ class MetadataDownloader:
return f"artist: \"{self.artist}\"" return f"artist: \"{self.artist}\""
def save(self): def save(self):
self.logger.info(f"caching {self}") logger.info(f"caching {self}")
self.database.add_artist( database.add_artist(
musicbrainz_artistid=self.musicbrainz_artistid, musicbrainz_artistid=self.musicbrainz_artistid,
artist=self.artist artist=self.artist
) )
@ -82,16 +72,12 @@ class MetadataDownloader:
class ReleaseGroup: class ReleaseGroup:
def __init__( def __init__(
self, self,
database,
logger,
musicbrainz_releasegroupid: str, musicbrainz_releasegroupid: str,
artists=[], artists=[],
albumsort: int = None, albumsort: int = None,
only_download_distinct_releases: bool = True, only_download_distinct_releases: bool = True,
fetch_further: bool = True fetch_further: bool = True
): ):
self.database = database
self.logger = logger
""" """
split_artists: list -> if len > 1: album_artist=VariousArtists split_artists: list -> if len > 1: album_artist=VariousArtists
releases: list releases: list
@ -137,8 +123,8 @@ class MetadataDownloader:
return f"release group: \"{self.name}\"" return f"release group: \"{self.name}\""
def save(self): def save(self):
self.logger.info(f"caching {self}") logger.info(f"caching {self}")
self.database.add_release_group( database.add_release_group(
musicbrainz_releasegroupid=self.musicbrainz_releasegroupid, musicbrainz_releasegroupid=self.musicbrainz_releasegroupid,
artist_ids=[artist.musicbrainz_artistid for artist in self.artists], artist_ids=[artist.musicbrainz_artistid for artist in self.artists],
albumartist=self.albumartist, albumartist=self.albumartist,
@ -152,7 +138,7 @@ class MetadataDownloader:
for existing_artist in self.artists: for existing_artist in self.artists:
if artist_id == existing_artist.musicbrainz_artistid: if artist_id == existing_artist.musicbrainz_artistid:
return existing_artist return existing_artist
new_artist = MetadataDownloader.Artist(self.database, self.logger, artist_id, release_groups=[self], new_artist = MetadataDownloader.Artist(artist_id, release_groups=[self],
new_release_groups=False) new_release_groups=False)
self.artists.append(new_artist) self.artists.append(new_artist)
return new_artist return new_artist
@ -162,7 +148,7 @@ class MetadataDownloader:
if musicbrainz_albumid is None: if musicbrainz_albumid is None:
return return
self.releases.append( self.releases.append(
MetadataDownloader.Release(self.database, self.logger, musicbrainz_albumid, release_group=self)) MetadataDownloader.Release(musicbrainz_albumid, release_group=self))
def append_distinct_releases(self, release_datas: List[dict]): def append_distinct_releases(self, release_datas: List[dict]):
titles = {} titles = {}
@ -183,14 +169,10 @@ class MetadataDownloader:
class Release: class Release:
def __init__( def __init__(
self, self,
database,
logger,
musicbrainz_albumid: str, musicbrainz_albumid: str,
release_group=None, release_group=None,
fetch_furter: bool = True fetch_furter: bool = True
): ):
self.database = database
self.logger = logger
""" """
release_group: ReleaseGroup release_group: ReleaseGroup
tracks: list tracks: list
@ -206,9 +188,10 @@ class MetadataDownloader:
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[]) recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={}) release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={})
if self.release_group is None: if self.release_group is None:
self.release_group = MetadataDownloader.ReleaseGroup(self.database, self.logger, self.release_group = MetadataDownloader.ReleaseGroup(
musicbrainz_releasegroupid=get_elem_from_obj( musicbrainz_releasegroupid=get_elem_from_obj(
release_group_data, ['id']), fetch_further=False) release_group_data, ['id']),
fetch_further=False)
self.title = get_elem_from_obj(release_data, ['title']) self.title = get_elem_from_obj(release_data, ['title'])
self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name']) self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name'])
@ -228,8 +211,8 @@ class MetadataDownloader:
return f"release: {self.title} ©{self.copyright} {self.album_status}" return f"release: {self.title} ©{self.copyright} {self.album_status}"
def save(self): def save(self):
self.logger.info(f"caching {self}") logger.info(f"caching {self}")
self.database.add_release( database.add_release(
musicbrainz_albumid=self.musicbrainz_albumid, musicbrainz_albumid=self.musicbrainz_albumid,
release_group_id=self.release_group.musicbrainz_releasegroupid, release_group_id=self.release_group.musicbrainz_releasegroupid,
title=self.title, title=self.title,
@ -249,20 +232,16 @@ class MetadataDownloader:
continue continue
self.tracklist.append( self.tracklist.append(
MetadataDownloader.Track(self.database, self.logger, musicbrainz_releasetrackid, self, MetadataDownloader.Track(musicbrainz_releasetrackid, self,
track_number=str(i + 1))) track_number=str(i + 1)))
class Track: class Track:
def __init__( def __init__(
self, self,
database,
logger,
musicbrainz_releasetrackid: str, musicbrainz_releasetrackid: str,
release=None, release=None,
track_number: str = None track_number: str = None
): ):
self.database = database
self.logger = logger
""" """
release: Release release: Release
feature_artists: list feature_artists: list
@ -280,9 +259,7 @@ class MetadataDownloader:
recording_data = result['recording'] recording_data = result['recording']
release_data = get_elem_from_obj(recording_data, ['release-list', -1]) release_data = get_elem_from_obj(recording_data, ['release-list', -1])
if self.release is None: if self.release is None:
self.release = MetadataDownloader.Release(self.database, self.logger, self.release = MetadataDownloader.Release(get_elem_from_obj(release_data, ['id']), fetch_furter=False)
get_elem_from_obj(release_data, ['id']), fetch_furter=False)
for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]): for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]):
self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id'])) self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id']))
@ -296,9 +273,9 @@ class MetadataDownloader:
return f"track: \"{self.title}\" {self.isrc or ''}" return f"track: \"{self.title}\" {self.isrc or ''}"
def save(self): def save(self):
self.logger.info(f"caching {self}") logger.info(f"caching {self}")
self.database.add_track( database.add_track(
musicbrainz_releasetrackid=self.musicbrainz_releasetrackid, musicbrainz_releasetrackid=self.musicbrainz_releasetrackid,
musicbrainz_albumid=self.release.musicbrainz_albumid, musicbrainz_albumid=self.release.musicbrainz_albumid,
feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists], feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists],
@ -314,7 +291,7 @@ class MetadataDownloader:
for existing_artist in self.artists: for existing_artist in self.artists:
if artist_id == existing_artist.musicbrainz_artistid: if artist_id == existing_artist.musicbrainz_artistid:
return existing_artist return existing_artist
new_artist = MetadataDownloader.Artist(self.database, self.logger, artist_id, new_release_groups=False) new_artist = MetadataDownloader.Artist(artist_id, new_release_groups=False)
self.artists.append(new_artist) self.artists.append(new_artist)
return new_artist return new_artist
@ -323,42 +300,31 @@ class MetadataDownloader:
mb_id = option['id'] mb_id = option['id']
if type_ == "artist": if type_ == "artist":
self.Artist(self.database, self.logger, mb_id) return self.Artist(mb_id)
elif type_ == "release_group": if type_ == "release_group":
self.ReleaseGroup(self.database, self.logger, mb_id) return self.ReleaseGroup(mb_id)
elif type_ == "release": if type_ == "release":
self.Release(self.database, self.logger, mb_id) return self.Release(mb_id)
elif type_ == "track": if type_ == "recording":
self.Track(self.database, self.logger, mb_id) return self.Track(mb_id)
logger.error(f"download type {type_} doesn't exists :(")
if __name__ == "__main__": if __name__ == "__main__":
import tempfile logging.basicConfig(
import os level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(os.path.join(temp_dir, LOG_FILE)),
logging.StreamHandler()
]
)
temp_folder = "music-downloader" downloader = MetadataDownloader()
temp_dir = os.path.join(tempfile.gettempdir(), temp_folder)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
logging.basicConfig(level=logging.DEBUG) downloader.download({'id': 'd2006339-9e98-4624-a386-d503328eb854', 'type': 'recording'})
db_logger = logging.getLogger("database") downloader.download({'id': 'cdd16860-35fd-46af-bd8c-5de7b15ebc31', 'type': 'release'})
db_logger.setLevel(logging.DEBUG)
import database
database_ = database.Database(os.path.join(temp_dir, "metadata.db"),
os.path.join(temp_dir, "database_structure.sql"),
"https://raw.githubusercontent.com/HeIIow2/music-downloader/new_metadata/assets/database_structure.sql",
db_logger,
reset_anyways=False)
download_logger = logging.getLogger("metadata downloader")
download_logger.setLevel(logging.INFO)
downloader = MetadataDownloader(database_, download_logger)
downloader.download({'id': 'd2006339-9e98-4624-a386-d503328eb854', 'type': 'track'})
# downloader.download({'id': 'cdd16860-35fd-46af-bd8c-5de7b15ebc31', 'type': 'release'})
# download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'}) # download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'})
#download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'}) #download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'})

View File

@ -1,142 +0,0 @@
import logging
import musicbrainzngs
from metadata import options
mb_log = logging.getLogger("musicbrainzngs")
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
KNOWN_KIND_OF_OPTIONS = ["artist", "release", "track"]
class Search:
def __init__(self, query: str = None, artist: str = None, temp: str = "temp"):
if query is None and artist is None:
raise ValueError("no query provided")
self.options_history = []
self.current_options = None
self.current_chosen_option = None
self.temp = temp
# initial search
if query is not None:
self.set_options(
options.Options([musicbrainzngs.search_artists(query), musicbrainzngs.search_releases(query),
musicbrainzngs.search_recordings(query)]))
elif artist is not None:
self.set_options(options.Options([musicbrainzngs.search_artists(artist=artist)]))
def browse_artist(self, artist: dict, limit: int = 25):
options_sets = [
{"artist-list": [artist, ], "artist-count": 1},
musicbrainzngs.browse_releases(artist=artist["id"], limit=limit),
musicbrainzngs.browse_recordings(artist=artist["id"], limit=limit)
]
return self.set_options(options.Options(options_sets))
def browse_release(self, release: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(release=release["id"], limit=limit),
{"release-list": [release, ], "release-count": 1},
musicbrainzngs.browse_recordings(release=release["id"], limit=limit)
]
return self.set_options(options.Options(options_sets))
def browse_track(self, track: dict, limit: int = 25):
options_sets = [
musicbrainzngs.browse_artists(recording=track["id"], limit=limit),
musicbrainzngs.browse_releases(recording=track["id"], limit=limit),
{"recording-list": [track, ], "recording-count": 1}
]
return self.set_options(options.Options(options_sets))
def choose(self, index, limit: int = 25, ignore_limit_for_tracklist: bool = True):
if not self.current_options.choose(index):
return self.current_options
self.current_chosen_option = self.current_options.get_current_option(komplex=True)
kind = self.current_chosen_option['type']
if kind == 'artist':
return self.browse_artist(self.current_chosen_option, limit=limit)
if kind == 'release':
release_limit = limit if not ignore_limit_for_tracklist else 100
release_limit = 100
return self.browse_release(self.current_chosen_option, limit=release_limit)
if kind == 'track':
track_limit = limit if not ignore_limit_for_tracklist else 100
return self.browse_track(self.current_chosen_option, limit=track_limit)
return self.current_options
def get_options(self):
return self.current_options
def set_options(self, option_instance):
self.options_history.append(option_instance)
self.current_options = option_instance
return option_instance
def get_previous_options(self):
self.options_history.pop(-1)
self.current_options = self.options_history[-1]
return self.current_options
options = property(fget=get_options)
def automated_demo():
search = Search(query="psychonaut 4")
print(search.options)
print(search.choose(0))
search.download()
print(search.choose(2))
search.download()
print(search.choose(4))
print(search.download())
def interactive_demo():
search = Search(query=input("initial query: "))
print(search.options)
while True:
input_ = input(
"d to download, q to quit, .. for previous options, . for current options, int for this element: ").lower()
input_.strip()
if input_ == "q":
break
if input_ == ".":
print(search.options)
continue
if input_ == "..":
print(search.get_previous_options())
continue
if input_.isdigit():
print(search.choose(int(input_)))
continue
if input_ == "d":
search.download()
break
if __name__ == "__main__":
# interactive_demo()
# automated_demo()
search = Search(query="psychonaut 4")
# search.download_release("27f00fb8-983c-4d5c-950f-51418aac55dc")
search.download_release("1aeb676f-e556-4b17-b45e-64ab69ef0375")
# for track_ in search.download_artist("c0c720b5-012f-4204-a472-981403f37b12"):
# print(track_)
# res = search.download_track("83a30323-aee1-401a-b767-b3c1bdd026c0")
# res = search.download_track("5e1ee2c5-502c-44d3-b1bc-22803441d8c6")
res = search.download_track("86b43bec-eea6-40ae-8624-c1e404204ba1")
# res = search.download_track("5cc28584-10c6-40e2-b6d4-6891e7e7c575")
for key in res[0]:
if res[0][key] is None:
continue
print(key, res[0][key])

File diff suppressed because one or more lines are too long

View File

@ -1,18 +1,12 @@
from typing import List from typing import List
import logging
import musicbrainzngs import musicbrainzngs
try: from src.utils.shared import *
from object_handeling import get_elem_from_obj, parse_music_brainz_date from src.utils.object_handeling import get_elem_from_obj, parse_music_brainz_date
except ModuleNotFoundError: logger = SEARCH_LOGGER
from metadata.object_handeling import get_elem_from_obj, parse_music_brainz_date
mb_log = logging.getLogger("musicbrainzngs") MAX_PARAMETERS = 3
mb_log.setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
MAX_PARAMATERS = 3
OPTION_TYPES = ['artist', 'release_group', 'release', 'recording'] OPTION_TYPES = ['artist', 'release_group', 'release', 'recording']
@ -45,6 +39,7 @@ class Option:
} }
return f"{type_repr[self.type]}: \"{self.name}\"{self.additional_info}" return f"{type_repr[self.type]}: \"{self.name}\"{self.additional_info}"
class MultipleOptions: class MultipleOptions:
def __init__(self, option_list: List[Option]) -> None: def __init__(self, option_list: List[Option]) -> None:
self.option_list = option_list self.option_list = option_list
@ -54,9 +49,7 @@ class MultipleOptions:
class Search: class Search:
def __init__(self, logger: logging.Logger) -> None: def __init__(self) -> None:
self.logger = logger
self.options_history = [] self.options_history = []
self.current_option: Option self.current_option: Option
@ -228,8 +221,10 @@ class Search:
return self.fetch_new_options() return self.fetch_new_options()
@staticmethod @staticmethod
def search_recording_from_text(artist: str = None, release_group: str = None, recording: str = None, query: str = None): def search_recording_from_text(artist: str = None, release_group: str = None, recording: str = None,
result = musicbrainzngs.search_recordings(artist=artist, release=release_group, recording=recording, query=query) query: str = None):
result = musicbrainzngs.search_recordings(artist=artist, release=release_group, recording=recording,
query=query)
recording_list = get_elem_from_obj(result, ['recording-list'], return_if_none=[]) recording_list = get_elem_from_obj(result, ['recording-list'], return_if_none=[])
resulting_options = [ resulting_options = [
@ -260,25 +255,26 @@ class Search:
return resulting_options return resulting_options
def search_from_text(self, artist: str = None, release_group: str = None, recording: str = None) -> MultipleOptions: def search_from_text(self, artist: str = None, release_group: str = None, recording: str = None) -> MultipleOptions:
self.logger.info(f"searching specified artist: \"{artist}\", release group: \"{release_group}\", recording: \"{recording}\"") logger.info(
f"searching specified artist: \"{artist}\", release group: \"{release_group}\", recording: \"{recording}\"")
if artist is None and release_group is None and recording is None: if artist is None and release_group is None and recording is None:
self.logger.error("either artist, release group or recording has to be set") logger.error("either artist, release group or recording has to be set")
return -1 return MultipleOptions([])
if recording is not None: if recording is not None:
self.logger.info("search for recording") logger.info("search for recording")
results = self.search_recording_from_text(artist=artist, release_group=release_group, recording=recording) results = self.search_recording_from_text(artist=artist, release_group=release_group, recording=recording)
elif release_group is not None: elif release_group is not None:
self.logger.info("search for release group") logger.info("search for release group")
results = self.search_release_group_from_text(artist=artist, release_group=release_group) results = self.search_release_group_from_text(artist=artist, release_group=release_group)
else: else:
self.logger.info("search for artist") logger.info("search for artist")
results = self.search_artist_from_text(artist=artist) results = self.search_artist_from_text(artist=artist)
return self.append_new_choices(results) return self.append_new_choices(results)
def search_from_text_unspecified(self, query: str) -> MultipleOptions: def search_from_text_unspecified(self, query: str) -> MultipleOptions:
self.logger.info(f"searching unspecified: \"{query}\"") logger.info(f"searching unspecified: \"{query}\"")
results = [] results = []
results.extend(self.search_artist_from_text(query=query)) results.extend(self.search_artist_from_text(query=query))
@ -310,8 +306,8 @@ class Search:
parameters = query.split('#') parameters = query.split('#')
parameters.remove('') parameters.remove('')
if len(parameters) > MAX_PARAMATERS: if len(parameters) > MAX_PARAMETERS:
raise ValueError(f"too many parameters. Only {MAX_PARAMATERS} are allowed") raise ValueError(f"too many parameters. Only {MAX_PARAMETERS} are allowed")
for parameter in parameters: for parameter in parameters:
splitted = parameter.split(" ") splitted = parameter.split(" ")
@ -332,7 +328,7 @@ class Search:
def automated_demo(): def automated_demo():
search = Search(logger=logger_) search = Search()
search.search_from_text(artist="I Prevail") search.search_from_text(artist="I Prevail")
# choose an artist # choose an artist
@ -346,9 +342,10 @@ def automated_demo():
def interactive_demo(): def interactive_demo():
search = Search(logger=logger_) search = Search()
while True: while True:
input_ = input("q to quit, .. for previous options, int for this element, str to search for query, ok to download: ") input_ = input(
"q to quit, .. for previous options, int for this element, str to search for query, ok to download: ")
input_.strip() input_.strip()
if input_.lower() == "ok": if input_.lower() == "ok":
break break
@ -364,8 +361,4 @@ def interactive_demo():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logger_ = logging.getLogger("test")
interactive_demo() interactive_demo()

0
src/scraping/__init__.py Normal file
View File

View File

@ -0,0 +1,57 @@
import os
from src.utils.shared import *
from src.utils import phonetic_compares
def is_valid(a1, a2, t1, t2) -> bool:
title_match, title_distance = phonetic_compares.match_titles(t1, t2)
artist_match, artist_distance = phonetic_compares.match_artists(a1, a2)
return not title_match and not artist_match
def get_metadata(file):
artist = None
title = None
audiofile = EasyID3(file)
artist = audiofile['artist']
title = audiofile['title']
return artist, title
def check_for_song(folder, artists, title):
if not os.path.exists(folder):
return False
files = [os.path.join(folder, i) for i in os.listdir(folder)]
for file in files:
artists_, title_ = get_metadata(file)
if is_valid(artists, artists_, title, title_):
return True
return False
def get_path(row):
title = row['title']
artists = row['artists']
path_ = os.path.join(MUSIC_DIR, row['path'])
print(artists, title, path_)
check_for_song(path_, artists, title)
return None
if __name__ == "__main__":
row = {'artists': ['Psychonaut 4'], 'id': '6b40186b-6678-4328-a4b8-eb7c9806a9fb', 'tracknumber': None,
'titlesort ': None, 'musicbrainz_releasetrackid': '6b40186b-6678-4328-a4b8-eb7c9806a9fb',
'musicbrainz_albumid': '0d229a02-74f6-4c77-8c20-6612295870ae', 'title': 'Sweet Decadance', 'isrc': None,
'album': 'Neurasthenia', 'copyright': 'Talheim Records', 'album_status': 'Official', 'language': 'eng',
'year': '2016', 'date': '2016-10-07', 'country': 'AT', 'barcode': None, 'albumartist': 'Psychonaut 4',
'albumsort': None, 'musicbrainz_albumtype': 'Album', 'compilation': None,
'album_artist_id': 'c0c720b5-012f-4204-a472-981403f37b12', 'path': 'dsbm/Psychonaut 4/Neurasthenia',
'file': 'dsbm/Psychonaut 4/Neurasthenia/Sweet Decadance.mp3', 'genre': 'dsbm', 'url': None, 'src': None}
print(get_path(row))

View File

@ -4,10 +4,8 @@ import time
import requests import requests
import bs4 import bs4
try: from src.utils.shared import *
import phonetic_compares from src.utils import phonetic_compares
except ModuleNotFoundError:
from scraping import phonetic_compares
TRIES = 5 TRIES = 5
TIMEOUT = 10 TIMEOUT = 10
@ -18,9 +16,6 @@ session.headers = {
"Connection": "keep-alive", "Connection": "keep-alive",
"Referer": "https://musify.club/" "Referer": "https://musify.club/"
} }
def set_proxy(proxies):
session.proxies = proxies session.proxies = proxies

View File

@ -4,10 +4,7 @@ import youtube_dl
import logging import logging
import time import time
try: from src.utils import phonetic_compares
import phonetic_compares
except ModuleNotFoundError:
from scraping import phonetic_compares
YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'} YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
YOUTUBE_URL_KEY = 'webpage_url' YOUTUBE_URL_KEY = 'webpage_url'

View File

@ -1,6 +1,10 @@
import os.path import os.path
import logging import logging
from src.utils.shared import *
logger = PATH_LOGGER
UNHIDE_CHAR = '_' UNHIDE_CHAR = '_'
def unhide(part: str): def unhide(part: str):
@ -13,15 +17,13 @@ def unhide(part: str):
class UrlPath: class UrlPath:
def __init__(self, database, logger: logging.Logger, genre: str): def __init__(self, genre: str):
self.database = database
self.logger = logger
self.genre = genre self.genre = genre
for row in self.database.get_tracks_without_filepath(): for row in database.get_tracks_without_filepath():
file, path = self.get_path_from_row(row) file, path = self.get_path_from_row(row)
self.database.set_filepath(row['id'], file, path, genre) database.set_filepath(row['id'], file, path, genre)
def get_path_from_row(self, row): def get_path_from_row(self, row):
""" """
@ -34,7 +36,8 @@ class UrlPath:
f"{self.get_song(row)}.mp3"), os.path.join(self.get_genre(), self.get_artist(row), f"{self.get_song(row)}.mp3"), os.path.join(self.get_genre(), self.get_artist(row),
self.get_album(row)) self.get_album(row))
def escape_part(self, part: str): @staticmethod
def escape_part(part: str):
return unhide(part.replace("/", " ")) return unhide(part.replace("/", " "))
def get_genre(self): def get_genre(self):

2
src/utils/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# tells what exists
__all__ = ["shared", "object_handeling", "phonetic_compares"]

44
src/utils/shared.py Normal file
View File

@ -0,0 +1,44 @@
import musicbrainzngs
import logging
import tempfile
import os
from src.metadata.database import Database
TEMP_FOLDER = "music-downloader"
LOG_FILE = "download_logs.log"
DATABASE_FILE = "metadata.db"
DATABASE_STRUCTURE_FILE = "database_structure.sql"
DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/new_metadata/assets/database_structure.sql"
SEARCH_LOGGER = logging.getLogger("mb-cli")
DATABASE_LOGGER = logging.getLogger("database")
METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata-download")
URL_DOWNLOAD_LOGGER = logging.getLogger("ling-download")
PATH_LOGGER = logging.getLogger("create-paths")
DOWNLOAD_LOGGER = logging.getLogger("download")
GENIUS_LOGGER = logging.getLogger("genius")
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.expanduser('~/Music')
temp_dir = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
mb_log = logging.getLogger("musicbrainzngs").setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
database = Database(os.path.join(temp_dir, DATABASE_FILE),
os.path.join(temp_dir, DATABASE_STRUCTURE_FILE),
DATABASE_STRUCTURE_FALLBACK,
DATABASE_LOGGER,
reset_anyways=True)
TOR = False
proxies = {
'http': 'socks5h://127.0.0.1:9150',
'https': 'socks5h://127.0.0.1:9150'
} if TOR else {}