completed new cli
This commit is contained in:
@@ -5,15 +5,17 @@ from ..objects import DatabaseObject, Source
|
||||
from ..utils.enums.source import SourcePages
|
||||
from ..utils.support_classes import Query, DownloadResult
|
||||
from ..utils.exception.download import UrlNotFoundException
|
||||
from ..pages import Page, EncyclopaediaMetallum, Musify, INDEPENDENT_DB_OBJECTS
|
||||
from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, INDEPENDENT_DB_OBJECTS
|
||||
|
||||
ALL_PAGES: Set[Type[Page]] = {
|
||||
EncyclopaediaMetallum,
|
||||
Musify
|
||||
Musify,
|
||||
YouTube,
|
||||
}
|
||||
|
||||
AUDIO_PAGES: Set[Type[Page]] = {
|
||||
Musify,
|
||||
YouTube,
|
||||
}
|
||||
|
||||
SHADY_PAGES: Set[Type[Page]] = {
|
||||
|
@@ -1,2 +0,0 @@
|
||||
|
||||
|
@@ -1,106 +0,0 @@
|
||||
from typing import List
|
||||
import mutagen.id3
|
||||
import requests
|
||||
import os.path
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from pydub import AudioSegment
|
||||
|
||||
from ..utils.shared import *
|
||||
from .sources import (
|
||||
youtube,
|
||||
musify,
|
||||
local_files
|
||||
)
|
||||
from ..database.song import (
|
||||
Song as song_object,
|
||||
Target as target_object,
|
||||
Source as source_object
|
||||
)
|
||||
from ..database.temp_database import temp_database
|
||||
|
||||
logger = DOWNLOAD_LOGGER
|
||||
|
||||
# maps the classes to get data from to the source name
|
||||
sources = {
|
||||
'Youtube': youtube.Youtube,
|
||||
'Musify': musify.Musify
|
||||
}
|
||||
|
||||
"""
|
||||
https://en.wikipedia.org/wiki/ID3
|
||||
https://mutagen.readthedocs.io/en/latest/user/id3.html
|
||||
|
||||
# to get all valid keys
|
||||
from mutagen.easyid3 import EasyID3
|
||||
print("\n".join(EasyID3.valid_keys.keys()))
|
||||
print(EasyID3.valid_keys.keys())
|
||||
"""
|
||||
|
||||
|
||||
class Download:
|
||||
def __init__(self):
|
||||
Download.fetch_audios(temp_database.get_tracks_to_download())
|
||||
|
||||
@classmethod
|
||||
def fetch_audios(cls, songs: List[song_object], override_existing: bool = False):
|
||||
for song in songs:
|
||||
if not cls.path_stuff(song.target) and not override_existing:
|
||||
cls.write_metadata(song)
|
||||
continue
|
||||
|
||||
is_downloaded = False
|
||||
for source in song.sources:
|
||||
download_success = Download.download_from_src(song, source)
|
||||
|
||||
if download_success == -1:
|
||||
logger.warning(f"couldn't download {song['url']} from {song['src']}")
|
||||
else:
|
||||
is_downloaded = True
|
||||
break
|
||||
|
||||
if is_downloaded:
|
||||
cls.write_metadata(song)
|
||||
|
||||
@classmethod
|
||||
def download_from_src(cls, song: song_object, source: source_object):
|
||||
if source.src not in sources:
|
||||
raise ValueError(f"source {source.src} seems to not exist")
|
||||
source_subclass = sources[source.src]
|
||||
|
||||
return source_subclass.fetch_audio(song, source)
|
||||
|
||||
@classmethod
|
||||
def write_metadata(cls, song: song_object):
|
||||
if not os.path.exists(song.target.file):
|
||||
logger.warning(f"file {song.target.file} doesn't exist")
|
||||
return False
|
||||
|
||||
# only convert the file to the proper format if mutagen doesn't work with it due to time
|
||||
try:
|
||||
audiofile = EasyID3(song.target.file)
|
||||
except mutagen.id3.ID3NoHeaderError:
|
||||
AudioSegment.from_file(song.target.file).export(song.target.file, format="mp3")
|
||||
audiofile = EasyID3(song.target.file)
|
||||
|
||||
for key, value in song.get_metadata():
|
||||
if type(value) != list:
|
||||
value = str(value)
|
||||
audiofile[key] = value
|
||||
|
||||
logger.info("saving")
|
||||
audiofile.save(song.target.file, v1=2)
|
||||
|
||||
@classmethod
|
||||
def path_stuff(cls, target: target_object) -> bool:
|
||||
# returns true if it should be downloaded
|
||||
if os.path.exists(target.file):
|
||||
logger.info(f"'{target.file}' does already exist, thus not downloading.")
|
||||
return False
|
||||
os.makedirs(target.path, exist_ok=True)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
s = requests.Session()
|
||||
Download()
|
@@ -1,70 +0,0 @@
|
||||
from typing import List
|
||||
|
||||
from ..utils.shared import *
|
||||
from .sources import (
|
||||
youtube,
|
||||
musify,
|
||||
local_files
|
||||
)
|
||||
from ..database.song import Song as song_object
|
||||
from ..database.temp_database import temp_database
|
||||
|
||||
logger = URL_DOWNLOAD_LOGGER
|
||||
|
||||
# maps the classes to get data from to the source name
|
||||
sources = {
|
||||
'Youtube': youtube.Youtube,
|
||||
'Musify': musify.Musify
|
||||
}
|
||||
|
||||
|
||||
class Download:
|
||||
def __init__(self) -> None:
|
||||
for song in temp_database.get_tracks_without_src():
|
||||
id_ = song['id']
|
||||
if os.path.exists(song.target.file):
|
||||
logger.info(f"skipping the fetching of the download links, cuz {song.target.file} already exists.")
|
||||
continue
|
||||
|
||||
success = False
|
||||
for src in AUDIO_SOURCES:
|
||||
res = Download.fetch_from_src(song, src)
|
||||
if res is not None:
|
||||
success = True
|
||||
Download.add_url(res, src, id_)
|
||||
|
||||
if not success:
|
||||
logger.warning(f"Didn't find any sources for {song}")
|
||||
|
||||
@classmethod
|
||||
def fetch_sources(cls, songs: List[song_object], skip_existing_files: bool = False):
|
||||
for song in songs:
|
||||
if song.target.exists_on_disc and skip_existing_files:
|
||||
logger.info(f"skipping the fetching of the download links, cuz {song.target.file} already exists.")
|
||||
continue
|
||||
|
||||
success = False
|
||||
for src in AUDIO_SOURCES:
|
||||
res = cls.fetch_from_src(song, src)
|
||||
if res is not None:
|
||||
success = True
|
||||
cls.add_url(res, src, song.id)
|
||||
|
||||
if not success:
|
||||
logger.warning(f"Didn't find any sources for {song}")
|
||||
|
||||
@classmethod
|
||||
def fetch_from_src(cls, song, src):
|
||||
if src not in sources:
|
||||
raise ValueError(f"source {src} seems to not exist")
|
||||
|
||||
source_subclass = sources[src]
|
||||
return source_subclass.fetch_source(song)
|
||||
|
||||
@classmethod
|
||||
def add_url(cls, url: str, src: str, id_: str):
|
||||
temp_database.set_download_data(id_, url, src)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
download = Download()
|
@@ -1,7 +0,0 @@
|
||||
from . import (
|
||||
metadata_search,
|
||||
metadata_fetch
|
||||
)
|
||||
|
||||
MetadataSearch = metadata_search.Search
|
||||
MetadataDownload = metadata_fetch.MetadataDownloader
|
@@ -1,345 +0,0 @@
|
||||
from src.music_kraken.utils.shared import *
|
||||
from src.music_kraken.utils.object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
from src.music_kraken.database.temp_database import temp_database
|
||||
|
||||
from typing import List
|
||||
import musicbrainzngs
|
||||
import logging
|
||||
|
||||
# I don't know if it would be feesable to set up my own mb instance
|
||||
# https://github.com/metabrainz/musicbrainz-docker
|
||||
|
||||
|
||||
# IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES
|
||||
# https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data
|
||||
|
||||
logger = METADATA_DOWNLOAD_LOGGER
|
||||
|
||||
|
||||
class MetadataDownloader:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
class Artist:
|
||||
def __init__(
|
||||
self,
|
||||
musicbrainz_artistid: str,
|
||||
release_groups: List = [],
|
||||
new_release_groups: bool = True
|
||||
):
|
||||
"""
|
||||
release_groups: list
|
||||
"""
|
||||
self.release_groups = release_groups
|
||||
|
||||
self.musicbrainz_artistid = musicbrainz_artistid
|
||||
|
||||
try:
|
||||
result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"])
|
||||
except musicbrainzngs.musicbrainz.NetworkError:
|
||||
return
|
||||
artist_data = get_elem_from_obj(result, ['artist'], return_if_none={})
|
||||
|
||||
self.artist = get_elem_from_obj(artist_data, ['name'])
|
||||
|
||||
self.save()
|
||||
|
||||
# STARTING TO FETCH' RELEASE GROUPS. IMPORTANT: DON'T WRITE ANYTHING BESIDES THAT HERE
|
||||
if not new_release_groups:
|
||||
return
|
||||
# sort all release groups by date and add album sort to have them in chronological order.
|
||||
release_groups = artist_data['release-group-list']
|
||||
for i, release_group in enumerate(release_groups):
|
||||
release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
|
||||
release_groups.sort(key=lambda x: x['first-release-date'])
|
||||
|
||||
for i, release_group in enumerate(release_groups):
|
||||
self.release_groups.append(MetadataDownloader.ReleaseGroup(
|
||||
musicbrainz_releasegroupid=release_group['id'],
|
||||
artists=[self],
|
||||
albumsort=i + 1
|
||||
))
|
||||
|
||||
def __str__(self):
|
||||
newline = "\n"
|
||||
return f"artist: \"{self.artist}\""
|
||||
|
||||
def save(self):
|
||||
logger.info(f"caching {self}")
|
||||
temp_database.add_artist(
|
||||
musicbrainz_artistid=self.musicbrainz_artistid,
|
||||
artist=self.artist
|
||||
)
|
||||
|
||||
class ReleaseGroup:
|
||||
def __init__(
|
||||
self,
|
||||
musicbrainz_releasegroupid: str,
|
||||
artists=[],
|
||||
albumsort: int = None,
|
||||
only_download_distinct_releases: bool = True,
|
||||
fetch_further: bool = True
|
||||
):
|
||||
"""
|
||||
split_artists: list -> if len > 1: album_artist=VariousArtists
|
||||
releases: list
|
||||
"""
|
||||
|
||||
self.musicbrainz_releasegroupid = musicbrainz_releasegroupid
|
||||
self.artists = artists
|
||||
self.releases = []
|
||||
|
||||
try:
|
||||
result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid,
|
||||
includes=["artist-credits", "releases"])
|
||||
except musicbrainzngs.musicbrainz.NetworkError:
|
||||
return
|
||||
release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={})
|
||||
release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={})
|
||||
|
||||
# only for printing the release
|
||||
self.name = get_elem_from_obj(release_group_data, ['title'])
|
||||
|
||||
for artist_data in artist_datas:
|
||||
artist_id = get_elem_from_obj(artist_data, ['artist', 'id'])
|
||||
if artist_id is None:
|
||||
continue
|
||||
self.append_artist(artist_id)
|
||||
self.albumartist = "Various Artists" if len(self.artists) > 1 else self.artists[0].artist
|
||||
self.album_artist_id = None if self.albumartist == "Various Artists" else self.artists[
|
||||
0].musicbrainz_artistid
|
||||
|
||||
self.albumsort = albumsort
|
||||
self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type'])
|
||||
self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None
|
||||
|
||||
self.save()
|
||||
|
||||
if not fetch_further:
|
||||
return
|
||||
|
||||
if only_download_distinct_releases:
|
||||
self.append_distinct_releases(release_datas)
|
||||
else:
|
||||
self.append_all_releases(release_datas)
|
||||
|
||||
def __str__(self):
|
||||
return f"release group: \"{self.name}\""
|
||||
|
||||
def save(self):
|
||||
logger.info(f"caching {self}")
|
||||
temp_database.add_release_group(
|
||||
musicbrainz_releasegroupid=self.musicbrainz_releasegroupid,
|
||||
artist_ids=[artist.musicbrainz_artistid for artist in self.artists],
|
||||
albumartist=self.albumartist,
|
||||
albumsort=self.albumsort,
|
||||
musicbrainz_albumtype=self.musicbrainz_albumtype,
|
||||
compilation=self.compilation,
|
||||
album_artist_id=self.album_artist_id
|
||||
)
|
||||
|
||||
def append_artist(self, artist_id: str):
|
||||
for existing_artist in self.artists:
|
||||
if artist_id == existing_artist.musicbrainz_artistid:
|
||||
return existing_artist
|
||||
new_artist = MetadataDownloader.Artist(artist_id, release_groups=[self],
|
||||
new_release_groups=False)
|
||||
self.artists.append(new_artist)
|
||||
return new_artist
|
||||
|
||||
def append_release(self, release_data: dict):
|
||||
musicbrainz_albumid = get_elem_from_obj(release_data, ['id'])
|
||||
if musicbrainz_albumid is None:
|
||||
return
|
||||
self.releases.append(
|
||||
MetadataDownloader.Release(musicbrainz_albumid, release_group=self))
|
||||
|
||||
def append_distinct_releases(self, release_datas: List[dict]):
|
||||
titles = {}
|
||||
|
||||
for release_data in release_datas:
|
||||
title = get_elem_from_obj(release_data, ['title'])
|
||||
if title is None:
|
||||
continue
|
||||
titles[title] = release_data
|
||||
|
||||
for key in titles:
|
||||
self.append_release(titles[key])
|
||||
|
||||
def append_all_releases(self, release_datas: List[dict]):
|
||||
for release_data in release_datas:
|
||||
self.append_release(release_data)
|
||||
|
||||
class Release:
|
||||
def __init__(
|
||||
self,
|
||||
musicbrainz_albumid: str,
|
||||
release_group=None,
|
||||
fetch_furter: bool = True
|
||||
):
|
||||
"""
|
||||
release_group: ReleaseGroup
|
||||
tracks: list
|
||||
"""
|
||||
self.musicbrainz_albumid = musicbrainz_albumid
|
||||
self.release_group = release_group
|
||||
self.tracklist = []
|
||||
|
||||
try:
|
||||
result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid,
|
||||
includes=["recordings", "labels", "release-groups"])
|
||||
except musicbrainzngs.musicbrainz.NetworkError:
|
||||
return
|
||||
release_data = get_elem_from_obj(result, ['release'], return_if_none={})
|
||||
label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={})
|
||||
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
|
||||
release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={})
|
||||
if self.release_group is None:
|
||||
self.release_group = MetadataDownloader.ReleaseGroup(
|
||||
musicbrainz_releasegroupid=get_elem_from_obj(
|
||||
release_group_data, ['id']),
|
||||
fetch_further=False)
|
||||
|
||||
self.title = get_elem_from_obj(release_data, ['title'])
|
||||
self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name'])
|
||||
|
||||
self.album_status = get_elem_from_obj(release_data, ['status'])
|
||||
self.language = get_elem_from_obj(release_data, ['text-representation', 'language'])
|
||||
self.year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0])
|
||||
self.date = get_elem_from_obj(release_data, ['date'])
|
||||
self.country = get_elem_from_obj(release_data, ['country'])
|
||||
self.barcode = get_elem_from_obj(release_data, ['barcode'])
|
||||
|
||||
self.save()
|
||||
if fetch_furter:
|
||||
self.append_recordings(recording_datas)
|
||||
|
||||
def __str__(self):
|
||||
return f"release: {self.title} ©{self.copyright} {self.album_status}"
|
||||
|
||||
def save(self):
|
||||
logger.info(f"caching {self}")
|
||||
temp_database.add_release(
|
||||
musicbrainz_albumid=self.musicbrainz_albumid,
|
||||
release_group_id=self.release_group.musicbrainz_releasegroupid,
|
||||
title=self.title,
|
||||
copyright_=self.copyright,
|
||||
album_status=self.album_status,
|
||||
language=self.language,
|
||||
year=self.year,
|
||||
date=self.date,
|
||||
country=self.country,
|
||||
barcode=self.barcode
|
||||
)
|
||||
|
||||
def append_recordings(self, recording_datas: dict):
|
||||
for i, recording_data in enumerate(recording_datas):
|
||||
musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['recording', 'id'])
|
||||
if musicbrainz_releasetrackid is None:
|
||||
continue
|
||||
|
||||
self.tracklist.append(
|
||||
MetadataDownloader.Track(musicbrainz_releasetrackid, self,
|
||||
track_number=str(i + 1)))
|
||||
|
||||
class Track:
|
||||
def __init__(
|
||||
self,
|
||||
musicbrainz_releasetrackid: str,
|
||||
release=None,
|
||||
track_number: str = None
|
||||
):
|
||||
"""
|
||||
release: Release
|
||||
feature_artists: list
|
||||
"""
|
||||
|
||||
self.musicbrainz_releasetrackid = musicbrainz_releasetrackid
|
||||
self.release = release
|
||||
self.artists = []
|
||||
|
||||
self.track_number = track_number
|
||||
|
||||
try:
|
||||
result = musicbrainzngs.get_recording_by_id(self.musicbrainz_releasetrackid,
|
||||
includes=["artists", "releases", "recording-rels", "isrcs",
|
||||
"work-level-rels"])
|
||||
except musicbrainzngs.musicbrainz.NetworkError:
|
||||
return
|
||||
recording_data = result['recording']
|
||||
release_data = get_elem_from_obj(recording_data, ['release-list', -1])
|
||||
if self.release is None:
|
||||
self.release = MetadataDownloader.Release(get_elem_from_obj(release_data, ['id']), fetch_furter=False)
|
||||
|
||||
for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]):
|
||||
self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id']))
|
||||
|
||||
self.isrc = get_elem_from_obj(recording_data, ['isrc-list', 0])
|
||||
self.title = recording_data['title']
|
||||
|
||||
self.lenth = get_elem_from_obj(recording_data, ['length'])
|
||||
|
||||
self.save()
|
||||
|
||||
def __str__(self):
|
||||
return f"track: \"{self.title}\" {self.isrc or ''}"
|
||||
|
||||
def save(self):
|
||||
logger.info(f"caching {self}")
|
||||
|
||||
temp_database.add_track(
|
||||
musicbrainz_releasetrackid=self.musicbrainz_releasetrackid,
|
||||
musicbrainz_albumid=self.release.musicbrainz_albumid,
|
||||
feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists],
|
||||
tracknumber=self.track_number,
|
||||
track=self.title,
|
||||
isrc=self.isrc,
|
||||
length=int(self.lenth)
|
||||
)
|
||||
|
||||
def append_artist(self, artist_id: str):
|
||||
if artist_id is None:
|
||||
return
|
||||
|
||||
for existing_artist in self.artists:
|
||||
if artist_id == existing_artist.musicbrainz_artistid:
|
||||
return existing_artist
|
||||
new_artist = MetadataDownloader.Artist(artist_id, new_release_groups=False)
|
||||
self.artists.append(new_artist)
|
||||
return new_artist
|
||||
|
||||
def download(self, option: dict):
|
||||
type_ = option['type']
|
||||
mb_id = option['id']
|
||||
|
||||
if type_ == "artist":
|
||||
return self.Artist(mb_id)
|
||||
if type_ == "release_group":
|
||||
return self.ReleaseGroup(mb_id)
|
||||
if type_ == "release":
|
||||
return self.Release(mb_id)
|
||||
if type_ == "recording":
|
||||
return self.Track(mb_id)
|
||||
|
||||
logger.error(f"download type {type_} doesn't exists :(")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(os.path.join(TEMP_DIR, LOG_FILE)),
|
||||
logging.StreamHandler()
|
||||
]
|
||||
)
|
||||
|
||||
downloader = MetadataDownloader()
|
||||
|
||||
downloader.download({'id': 'd2006339-9e98-4624-a386-d503328eb854', 'type': 'recording'})
|
||||
downloader.download({'id': 'cdd16860-35fd-46af-bd8c-5de7b15ebc31', 'type': 'release'})
|
||||
# download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'})
|
||||
#download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'})
|
@@ -1,364 +0,0 @@
|
||||
from typing import List
|
||||
import musicbrainzngs
|
||||
|
||||
from src.music_kraken.utils.shared import *
|
||||
from src.music_kraken.utils.object_handeling import get_elem_from_obj, parse_music_brainz_date
|
||||
|
||||
logger = SEARCH_LOGGER
|
||||
|
||||
MAX_PARAMETERS = 3
|
||||
OPTION_TYPES = ['artist', 'release_group', 'release', 'recording']
|
||||
|
||||
|
||||
class Option:
|
||||
def __init__(self, type_: str, id_: str, name: str, additional_info: str = "") -> None:
|
||||
# print(type_, id_, name)
|
||||
if type_ not in OPTION_TYPES:
|
||||
raise ValueError(f"type: {type_} doesn't exist. Legal Values: {OPTION_TYPES}")
|
||||
self.type = type_
|
||||
self.name = name
|
||||
self.id = id_
|
||||
|
||||
self.additional_info = additional_info
|
||||
|
||||
def __getitem__(self, item):
|
||||
map_ = {
|
||||
"id": self.id,
|
||||
"type": self.type,
|
||||
"kind": self.type,
|
||||
"name": self.name
|
||||
}
|
||||
return map_[item]
|
||||
|
||||
def __repr__(self) -> str:
|
||||
type_repr = {
|
||||
'artist': 'artist\t\t',
|
||||
'release_group': 'release group\t',
|
||||
'release': 'release\t\t',
|
||||
'recording': 'recording\t'
|
||||
}
|
||||
return f"{type_repr[self.type]}: \"{self.name}\"{self.additional_info}"
|
||||
|
||||
|
||||
class MultipleOptions:
|
||||
def __init__(self, option_list: List[Option]) -> None:
|
||||
self.option_list = option_list
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "\n".join([f"{str(i).zfill(2)}) {choice.__repr__()}" for i, choice in enumerate(self.option_list)])
|
||||
|
||||
|
||||
class Search:
|
||||
def __init__(self) -> None:
|
||||
self.options_history = []
|
||||
self.current_option: Option
|
||||
|
||||
def append_new_choices(self, new_choices: List[Option]) -> MultipleOptions:
|
||||
self.options_history.append(new_choices)
|
||||
return MultipleOptions(new_choices)
|
||||
|
||||
def get_previous_options(self):
|
||||
self.options_history.pop(-1)
|
||||
return MultipleOptions(self.options_history[-1])
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_artist(artist: Option):
|
||||
"""
|
||||
returning list of artist and every release group
|
||||
"""
|
||||
result = musicbrainzngs.get_artist_by_id(artist.id, includes=["release-groups", "releases"])
|
||||
artist_data = get_elem_from_obj(result, ['artist'], return_if_none={})
|
||||
|
||||
result = [artist]
|
||||
|
||||
# sort all release groups by date and add album sort to have them in chronological order.
|
||||
release_group_list = artist_data['release-group-list']
|
||||
for i, release_group in enumerate(release_group_list):
|
||||
release_group_list[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date'])
|
||||
release_group_list.sort(key=lambda x: x['first-release-date'])
|
||||
release_group_list = [Option("release_group", get_elem_from_obj(release_group_, ['id']),
|
||||
get_elem_from_obj(release_group_, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_group_, ['type'])}) from {get_elem_from_obj(release_group_, ['first-release-date'])}")
|
||||
for release_group_ in release_group_list]
|
||||
|
||||
result.extend(release_group_list)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_release_group(release_group: Option):
|
||||
"""
|
||||
returning list including the artists, the releases and the tracklist of the first release
|
||||
"""
|
||||
results = []
|
||||
|
||||
result = musicbrainzngs.get_release_group_by_id(release_group.id,
|
||||
includes=["artist-credits", "releases"])
|
||||
release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={})
|
||||
release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending initial release group
|
||||
results.append(release_group)
|
||||
|
||||
# appending all releases
|
||||
first_release = None
|
||||
for i, release_data in enumerate(release_datas):
|
||||
results.append(
|
||||
Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_data, ['status'])})"))
|
||||
if i == 0:
|
||||
first_release = results[-1]
|
||||
|
||||
# append tracklist of first release
|
||||
if first_release is not None:
|
||||
results.extend(Search.fetch_new_options_from_release(first_release, only_tracklist=True))
|
||||
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_release(release: Option, only_tracklist: bool = False):
|
||||
"""
|
||||
artists
|
||||
release group
|
||||
release
|
||||
tracklist
|
||||
"""
|
||||
results = []
|
||||
result = musicbrainzngs.get_release_by_id(release.id,
|
||||
includes=["recordings", "labels", "release-groups", "artist-credits"])
|
||||
release_data = get_elem_from_obj(result, ['release'], return_if_none={})
|
||||
label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={})
|
||||
recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[])
|
||||
release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={})
|
||||
artist_datas = get_elem_from_obj(release_data, ['artist-credit'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending the according release group
|
||||
results.append(Option("release_group", get_elem_from_obj(release_group_data, ['id']),
|
||||
get_elem_from_obj(release_group_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_group_data, ['type'])}) from {get_elem_from_obj(release_group_data, ['first-release-date'])}"))
|
||||
|
||||
# appending the release
|
||||
results.append(release)
|
||||
|
||||
# appending the tracklist, but first putting it in a list, in case of only_tracklist being True to
|
||||
# return this instead
|
||||
tracklist = []
|
||||
for i, recording_data in enumerate(recording_datas):
|
||||
recording_data = recording_data['recording']
|
||||
tracklist.append(Option('recording', get_elem_from_obj(recording_data, ['id']),
|
||||
get_elem_from_obj(recording_data, ['title']),
|
||||
f" ({get_elem_from_obj(recording_data, ['length'])}) from {get_elem_from_obj(recording_data, ['artist-credit-phrase'])}"))
|
||||
|
||||
if only_tracklist:
|
||||
return tracklist
|
||||
results.extend(tracklist)
|
||||
return results
|
||||
|
||||
@staticmethod
|
||||
def fetch_new_options_from_record(recording: Option):
|
||||
"""
|
||||
artists, release, record
|
||||
"""
|
||||
results = []
|
||||
|
||||
result = musicbrainzngs.get_recording_by_id(recording.id, includes=["artists", "releases"])
|
||||
recording_data = result['recording']
|
||||
release_datas = get_elem_from_obj(recording_data, ['release-list'])
|
||||
artist_datas = get_elem_from_obj(recording_data, ['artist-credit'], return_if_none={})
|
||||
|
||||
# appending all the artists to results
|
||||
for artist_data in artist_datas:
|
||||
results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']),
|
||||
get_elem_from_obj(artist_data, ['artist', 'name'])))
|
||||
|
||||
# appending all releases
|
||||
for i, release_data in enumerate(release_datas):
|
||||
results.append(
|
||||
Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']),
|
||||
additional_info=f" ({get_elem_from_obj(release_data, ['status'])})"))
|
||||
|
||||
results.append(recording)
|
||||
|
||||
return results
|
||||
|
||||
def fetch_new_options(self) -> MultipleOptions:
|
||||
if self.current_option is None:
|
||||
return -1
|
||||
|
||||
result = []
|
||||
if self.current_option.type == 'artist':
|
||||
result = self.fetch_new_options_from_artist(self.current_option)
|
||||
elif self.current_option.type == 'release_group':
|
||||
result = self.fetch_new_options_from_release_group(self.current_option)
|
||||
elif self.current_option.type == 'release':
|
||||
result = self.fetch_new_options_from_release(self.current_option)
|
||||
elif self.current_option.type == 'recording':
|
||||
result = self.fetch_new_options_from_record(self.current_option)
|
||||
|
||||
return self.append_new_choices(result)
|
||||
|
||||
def choose(self, index: int) -> MultipleOptions:
|
||||
if len(self.options_history) == 0:
|
||||
logging.error("initial query neaded before choosing")
|
||||
return MultipleOptions([])
|
||||
|
||||
latest_options = self.options_history[-1]
|
||||
if index >= len(latest_options):
|
||||
logging.error("index outside of options")
|
||||
return MultipleOptions([])
|
||||
|
||||
self.current_option = latest_options[index]
|
||||
return self.fetch_new_options()
|
||||
|
||||
@staticmethod
|
||||
def search_recording_from_text(artist: str = None, release_group: str = None, recording: str = None,
|
||||
query: str = None):
|
||||
result = musicbrainzngs.search_recordings(artist=artist, release=release_group, recording=recording,
|
||||
query=query)
|
||||
recording_list = get_elem_from_obj(result, ['recording-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [
|
||||
Option("recording", get_elem_from_obj(recording_, ['id']), get_elem_from_obj(recording_, ['title']),
|
||||
additional_info=f" of {get_elem_from_obj(recording_, ['release-list', 0, 'title'])} by {get_elem_from_obj(recording_, ['artist-credit', 0, 'name'])}")
|
||||
for recording_ in recording_list]
|
||||
return resulting_options
|
||||
|
||||
@staticmethod
|
||||
def search_release_group_from_text(artist: str = None, release_group: str = None, query: str = None):
|
||||
result = musicbrainzngs.search_release_groups(artist=artist, releasegroup=release_group, query=query)
|
||||
release_group_list = get_elem_from_obj(result, ['release-group-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [Option("release_group", get_elem_from_obj(release_group_, ['id']),
|
||||
get_elem_from_obj(release_group_, ['title']),
|
||||
additional_info=f" by {get_elem_from_obj(release_group_, ['artist-credit', 0, 'name'])}")
|
||||
for release_group_ in release_group_list]
|
||||
return resulting_options
|
||||
|
||||
@staticmethod
|
||||
def search_artist_from_text(artist: str = None, query: str = None):
|
||||
result = musicbrainzngs.search_artists(artist=artist, query=query)
|
||||
artist_list = get_elem_from_obj(result, ['artist-list'], return_if_none=[])
|
||||
|
||||
resulting_options = [Option("artist", get_elem_from_obj(artist_, ['id']), get_elem_from_obj(artist_, ['name']),
|
||||
additional_info=f": {', '.join([i['name'] for i in get_elem_from_obj(artist_, ['tag-list'], return_if_none=[])])}")
|
||||
for artist_ in artist_list]
|
||||
return resulting_options
|
||||
|
||||
def search_from_text(self, artist: str = None, release_group: str = None, recording: str = None) -> MultipleOptions:
|
||||
logger.info(
|
||||
f"searching specified artist: \"{artist}\", release group: \"{release_group}\", recording: \"{recording}\"")
|
||||
if artist is None and release_group is None and recording is None:
|
||||
logger.error("either artist, release group or recording has to be set")
|
||||
return MultipleOptions([])
|
||||
|
||||
if recording is not None:
|
||||
logger.info("search for recording")
|
||||
results = self.search_recording_from_text(artist=artist, release_group=release_group, recording=recording)
|
||||
elif release_group is not None:
|
||||
logger.info("search for release group")
|
||||
results = self.search_release_group_from_text(artist=artist, release_group=release_group)
|
||||
else:
|
||||
logger.info("search for artist")
|
||||
results = self.search_artist_from_text(artist=artist)
|
||||
|
||||
return self.append_new_choices(results)
|
||||
|
||||
def search_from_text_unspecified(self, query: str) -> MultipleOptions:
|
||||
logger.info(f"searching unspecified: \"{query}\"")
|
||||
|
||||
results = []
|
||||
results.extend(self.search_artist_from_text(query=query))
|
||||
results.extend(self.search_release_group_from_text(query=query))
|
||||
results.extend(self.search_recording_from_text(query=query))
|
||||
|
||||
return self.append_new_choices(results)
|
||||
|
||||
def search_from_query(self, query: str) -> MultipleOptions:
|
||||
if query is None:
|
||||
return MultipleOptions([])
|
||||
"""
|
||||
mit # wird ein neuer Parameter gestartet
|
||||
der Buchstabe dahinter legt die Art des Parameters fest
|
||||
"#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop"
|
||||
if no # is in the query it gets treated as "unspecified query"
|
||||
:param query:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if not '#' in query:
|
||||
return self.search_from_text_unspecified(query)
|
||||
|
||||
artist = None
|
||||
release_group = None
|
||||
recording = None
|
||||
|
||||
query = query.strip()
|
||||
parameters = query.split('#')
|
||||
parameters.remove('')
|
||||
|
||||
if len(parameters) > MAX_PARAMETERS:
|
||||
raise ValueError(f"too many parameters. Only {MAX_PARAMETERS} are allowed")
|
||||
|
||||
for parameter in parameters:
|
||||
splitted = parameter.split(" ")
|
||||
type_ = splitted[0]
|
||||
input_ = " ".join(splitted[1:]).strip()
|
||||
|
||||
if type_ == "a":
|
||||
artist = input_
|
||||
continue
|
||||
if type_ == "r":
|
||||
release_group = input_
|
||||
continue
|
||||
if type_ == "t":
|
||||
recording = input_
|
||||
continue
|
||||
|
||||
return self.search_from_text(artist=artist, release_group=release_group, recording=recording)
|
||||
|
||||
|
||||
def automated_demo():
|
||||
search = Search()
|
||||
search.search_from_text(artist="I Prevail")
|
||||
|
||||
# choose an artist
|
||||
search.choose(0)
|
||||
# choose a release group
|
||||
search.choose(9)
|
||||
# choose a release
|
||||
search.choose(2)
|
||||
# choose a recording
|
||||
search.choose(4)
|
||||
|
||||
|
||||
def interactive_demo():
|
||||
search = Search()
|
||||
while True:
|
||||
input_ = input(
|
||||
"q to quit, .. for previous options, int for this element, str to search for query, ok to download: ")
|
||||
input_.strip()
|
||||
if input_.lower() == "ok":
|
||||
break
|
||||
if input_.lower() == "q":
|
||||
break
|
||||
if input_.lower() == "..":
|
||||
search.get_previous_options()
|
||||
continue
|
||||
if input_.isdigit():
|
||||
search.choose(int(input_))
|
||||
continue
|
||||
search.search_from_query(input_)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
interactive_demo()
|
@@ -1,4 +0,0 @@
|
||||
from enum import Enum
|
||||
|
||||
class Providers(Enum):
|
||||
musicbrainz = "musicbrainz"
|
@@ -1,59 +0,0 @@
|
||||
from typing import List
|
||||
import musicbrainzngs
|
||||
|
||||
from src.music_kraken.database import (
|
||||
Artist,
|
||||
Album,
|
||||
Song
|
||||
)
|
||||
from src.music_kraken.utils.object_handeling import (
|
||||
get_elem_from_obj
|
||||
)
|
||||
|
||||
|
||||
def get_artist(flat: bool = False) -> Artist:
|
||||
# getting the flat artist
|
||||
artist_object = Artist()
|
||||
if flat:
|
||||
return artist_object
|
||||
# get additional stuff like discography
|
||||
return artist_object
|
||||
|
||||
|
||||
def get_album(flat: bool = False) -> Album:
|
||||
# getting the flat album object
|
||||
album_object = Album()
|
||||
if flat:
|
||||
return album_object
|
||||
# get additional stuff like tracklist
|
||||
return album_object
|
||||
|
||||
|
||||
def get_song(mb_id: str, flat: bool = False) -> Song:
|
||||
# getting the flat song object
|
||||
try:
|
||||
result = musicbrainzngs.get_recording_by_id(mb_id,
|
||||
includes=["artists", "releases", "recording-rels", "isrcs",
|
||||
"work-level-rels"])
|
||||
except musicbrainzngs.musicbrainz.NetworkError:
|
||||
return
|
||||
|
||||
recording_data = result['recording']
|
||||
|
||||
song_object = Song(
|
||||
mb_id=mb_id,
|
||||
title=recording_data['title'],
|
||||
length=get_elem_from_obj(recording_data, ['length']),
|
||||
isrc=get_elem_from_obj(recording_data, ['isrc-list', 0])
|
||||
)
|
||||
if flat:
|
||||
return song_object
|
||||
|
||||
# fetch additional stuff
|
||||
artist_data_list = get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[])
|
||||
for artist_data in artist_data_list:
|
||||
mb_artist_id = get_elem_from_obj(artist_data, ['artist', 'id'])
|
||||
|
||||
release_data = get_elem_from_obj(recording_data, ['release-list', -1])
|
||||
mb_release_id = get_elem_from_obj(release_data, ['id'])
|
||||
return song_object
|
@@ -1,172 +0,0 @@
|
||||
import requests
|
||||
from typing import List
|
||||
from bs4 import BeautifulSoup
|
||||
import pycountry
|
||||
|
||||
from src.music_kraken.database import (
|
||||
Lyrics,
|
||||
Song,
|
||||
Artist
|
||||
)
|
||||
from src.music_kraken.utils.shared import *
|
||||
from src.music_kraken.utils import phonetic_compares
|
||||
from src.music_kraken.utils.object_handeling import get_elem_from_obj
|
||||
|
||||
TIMEOUT = 10
|
||||
|
||||
# search doesn't support isrc
|
||||
# https://genius.com/api/search/multi?q=I Prevail - Breaking Down
|
||||
# https://genius.com/api/songs/6192944
|
||||
# https://docs.genius.com/
|
||||
|
||||
session = requests.Session()
|
||||
session.headers = {
|
||||
"Connection": "keep-alive",
|
||||
"Referer": "https://genius.com/search/embed"
|
||||
}
|
||||
session.proxies = proxies
|
||||
|
||||
logger = GENIUS_LOGGER
|
||||
|
||||
|
||||
class LyricsSong:
|
||||
def __init__(self, raw_data: dict, desirered_data: dict):
|
||||
self.raw_data = raw_data
|
||||
self.desired_data = desirered_data
|
||||
|
||||
song_data = get_elem_from_obj(self.raw_data, ['result'], return_if_none={})
|
||||
self.id = get_elem_from_obj(song_data, ['id'])
|
||||
self.artist = get_elem_from_obj(song_data, ['primary_artist', 'name'])
|
||||
self.title = get_elem_from_obj(song_data, ['title'])
|
||||
|
||||
lang_code = get_elem_from_obj(song_data, ['language']) or "en"
|
||||
self.language = pycountry.languages.get(alpha_2=lang_code)
|
||||
self.lang = self.language.alpha_3
|
||||
self.url = get_elem_from_obj(song_data, ['url'])
|
||||
|
||||
# maybe could be implemented
|
||||
self.lyricist: str
|
||||
|
||||
if get_elem_from_obj(song_data, ['lyrics_state']) != "complete":
|
||||
logger.warning(
|
||||
f"lyrics state of {self.title} by {self.artist} is not complete but {get_elem_from_obj(song_data, ['lyrics_state'])}")
|
||||
|
||||
self.valid = self.is_valid()
|
||||
if self.valid:
|
||||
logger.info(f"found lyrics for \"{self.__repr__()}\"")
|
||||
else:
|
||||
return
|
||||
|
||||
self.lyrics = self.fetch_lyrics()
|
||||
if self.lyrics is None:
|
||||
self.valid = False
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
title_match, title_distance = phonetic_compares.match_titles(self.title, self.desired_data['track'])
|
||||
artist_match, artist_distance = phonetic_compares.match_artists(self.desired_data['artist'], self.artist)
|
||||
|
||||
return not title_match and not artist_match
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.title} by {self.artist} ({self.url})"
|
||||
|
||||
def fetch_lyrics(self) -> str | None:
|
||||
if not self.valid:
|
||||
logger.warning(f"{self.__repr__()} is invalid but the lyrics still get fetched. Something could be wrong.")
|
||||
|
||||
try:
|
||||
r = session.get(self.url, timeout=TIMEOUT)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(f"{self.url} timed out after {TIMEOUT} seconds")
|
||||
return None
|
||||
if r.status_code != 200:
|
||||
logger.warning(f"{r.url} returned {r.status_code}:\n{r.content}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(r.content, "html.parser")
|
||||
lyrics_soups = soup.find_all('div', {'data-lyrics-container': "true"})
|
||||
if len(lyrics_soups) == 0:
|
||||
logger.warning(f"didn't found lyrics on {self.url}")
|
||||
return None
|
||||
# if len(lyrics_soups) != 1:
|
||||
# logger.warning(f"number of lyrics_soups doesn't equals 1, but {len(lyrics_soups)} on {self.url}")
|
||||
|
||||
lyrics = "\n".join([lyrics_soup.getText(separator="\n", strip=True) for lyrics_soup in lyrics_soups])
|
||||
|
||||
# <div data-lyrics-container="true" class="Lyrics__Container-sc-1ynbvzw-6 YYrds">With the soundle
|
||||
self.lyrics = lyrics
|
||||
return lyrics
|
||||
|
||||
def get_lyrics_object(self) -> Lyrics | None:
|
||||
if self.lyrics is None:
|
||||
return None
|
||||
return Lyrics(text=self.lyrics, language=self.lang or "en")
|
||||
|
||||
lyrics_object = property(fget=get_lyrics_object)
|
||||
|
||||
|
||||
def process_multiple_songs(song_datas: list, desired_data: dict) -> List[LyricsSong]:
|
||||
all_songs = [LyricsSong(song_data, desired_data) for song_data in song_datas]
|
||||
return all_songs
|
||||
|
||||
|
||||
def search_song_list(artist: str, track: str) -> List[LyricsSong]:
|
||||
endpoint = "https://genius.com/api/search/multi?q="
|
||||
url = f"{endpoint}{artist} - {track}"
|
||||
logging.info(f"requesting {url}")
|
||||
|
||||
desired_data = {
|
||||
'artist': artist,
|
||||
'track': track
|
||||
}
|
||||
|
||||
try:
|
||||
r = session.get(url, timeout=TIMEOUT)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(f"{url} timed out after {TIMEOUT} seconds")
|
||||
return []
|
||||
if r.status_code != 200:
|
||||
logging.warning(f"{r.url} returned {r.status_code}:\n{r.content}")
|
||||
return []
|
||||
content = r.json()
|
||||
if get_elem_from_obj(content, ['meta', 'status']) != 200:
|
||||
logging.warning(f"{r.url} returned {get_elem_from_obj(content, ['meta', 'status'])}:\n{content}")
|
||||
return []
|
||||
|
||||
sections = get_elem_from_obj(content, ['response', 'sections'])
|
||||
for section in sections:
|
||||
section_type = get_elem_from_obj(section, ['type'])
|
||||
if section_type == "song":
|
||||
return process_multiple_songs(get_elem_from_obj(section, ['hits'], return_if_none=[]), desired_data)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def fetch_lyrics_from_artist(song: Song, artist: Artist) -> List[Lyrics]:
|
||||
lyrics_list: List[Lyrics] = []
|
||||
lyrics_song_list = search_song_list(artist.name, song.title)
|
||||
|
||||
for lyrics_song in lyrics_song_list:
|
||||
if lyrics_song.valid:
|
||||
lyrics_list.append(lyrics_song.lyrics_object)
|
||||
|
||||
return lyrics_list
|
||||
|
||||
|
||||
def fetch_lyrics(song: Song) -> List[Lyrics]:
|
||||
lyrics: List[Lyrics] = []
|
||||
|
||||
for artist in song.artists:
|
||||
lyrics.extend(fetch_lyrics_from_artist(song, artist))
|
||||
|
||||
return lyrics
|
||||
|
||||
|
||||
"""
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
songs = search("Zombiez", "WALL OF Z")
|
||||
for song in songs:
|
||||
print(song)
|
||||
"""
|
@@ -1,57 +0,0 @@
|
||||
import os
|
||||
|
||||
from ...utils.shared import *
|
||||
from ...utils import phonetic_compares
|
||||
|
||||
|
||||
def is_valid(a1, a2, t1, t2) -> bool:
|
||||
title_match, title_distance = phonetic_compares.match_titles(t1, t2)
|
||||
artist_match, artist_distance = phonetic_compares.match_artists(a1, a2)
|
||||
|
||||
return not title_match and not artist_match
|
||||
|
||||
|
||||
def get_metadata(file):
|
||||
artist = None
|
||||
title = None
|
||||
|
||||
audiofile = EasyID3(file)
|
||||
artist = audiofile['artist']
|
||||
title = audiofile['title']
|
||||
|
||||
return artist, title
|
||||
|
||||
|
||||
def check_for_song(folder, artists, title):
|
||||
if not os.path.exists(folder):
|
||||
return False
|
||||
files = [os.path.join(folder, i) for i in os.listdir(folder)]
|
||||
|
||||
for file in files:
|
||||
artists_, title_ = get_metadata(file)
|
||||
if is_valid(artists, artists_, title, title_):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_path(row):
|
||||
title = row['title']
|
||||
artists = row['artists']
|
||||
path_ = os.path.join(MUSIC_DIR, row['path'])
|
||||
|
||||
print(artists, title, path_)
|
||||
check_for_song(path_, artists, title)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
row = {'artists': ['Psychonaut 4'], 'id': '6b40186b-6678-4328-a4b8-eb7c9806a9fb', 'tracknumber': None,
|
||||
'titlesort ': None, 'musicbrainz_releasetrackid': '6b40186b-6678-4328-a4b8-eb7c9806a9fb',
|
||||
'musicbrainz_albumid': '0d229a02-74f6-4c77-8c20-6612295870ae', 'title': 'Sweet Decadance', 'isrc': None,
|
||||
'album': 'Neurasthenia', 'copyright': 'Talheim Records', 'album_status': 'Official', 'language': 'eng',
|
||||
'year': '2016', 'date': '2016-10-07', 'country': 'AT', 'barcode': None, 'albumartist': 'Psychonaut 4',
|
||||
'albumsort': None, 'musicbrainz_albumtype': 'Album', 'compilation': None,
|
||||
'album_artist_id': 'c0c720b5-012f-4204-a472-981403f37b12', 'path': 'dsbm/Psychonaut 4/Neurasthenia',
|
||||
'file': 'dsbm/Psychonaut 4/Neurasthenia/Sweet Decadance.mp3', 'genre': 'dsbm', 'url': None, 'src': None}
|
||||
print(get_path(row))
|
@@ -1,181 +0,0 @@
|
||||
import time
|
||||
|
||||
import requests
|
||||
import bs4
|
||||
|
||||
from ...utils.shared import *
|
||||
from ...utils import phonetic_compares
|
||||
|
||||
from .source import AudioSource
|
||||
from ...database import song as song_objects
|
||||
|
||||
|
||||
TRIES = 5
|
||||
TIMEOUT = 10
|
||||
|
||||
logger = MUSIFY_LOGGER
|
||||
|
||||
session = requests.Session()
|
||||
session.headers = {
|
||||
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0",
|
||||
"Connection": "keep-alive",
|
||||
"Referer": "https://musify.club/"
|
||||
}
|
||||
session.proxies = proxies
|
||||
|
||||
|
||||
class Musify(AudioSource):
|
||||
@classmethod
|
||||
def fetch_source(cls, song: dict) -> str | None:
|
||||
super().fetch_source(song)
|
||||
|
||||
title = song.title
|
||||
artists = song.get_artist_names()
|
||||
|
||||
# trying to get a download link via the autocomplete api
|
||||
for artist in artists:
|
||||
url = cls.fetch_source_from_autocomplete(title=title, artist=artist)
|
||||
if url is not None:
|
||||
logger.info(f"found download link {url}")
|
||||
return url
|
||||
|
||||
# trying to get a download link via the html of the direct search page
|
||||
for artist in artists:
|
||||
url = cls.fetch_source_from_search(title=title, artist=artist)
|
||||
if url is not None:
|
||||
logger.info(f"found download link {url}")
|
||||
return url
|
||||
|
||||
logger.warning(f"Didn't find the audio on {cls.__name__}")
|
||||
|
||||
@classmethod
|
||||
def get_download_link(cls, track_url: str) -> str | None:
|
||||
# https://musify.club/track/dl/18567672/rauw-alejandro-te-felicito-feat-shakira.mp3
|
||||
# /track/sundenklang-wenn-mein-herz-schreit-3883217'
|
||||
|
||||
file_ = track_url.split("/")[-1]
|
||||
if len(file_) == 0:
|
||||
return None
|
||||
musify_id = file_.split("-")[-1]
|
||||
musify_name = "-".join(file_.split("-")[:-1])
|
||||
|
||||
return f"https://musify.club/track/dl/{musify_id}/{musify_name}.mp3"
|
||||
|
||||
@classmethod
|
||||
def fetch_source_from_autocomplete(cls, title: str, artist: str) -> str | None:
|
||||
url = f"https://musify.club/search/suggestions?term={artist} - {title}"
|
||||
|
||||
try:
|
||||
logger.info(f"calling {url}")
|
||||
r = session.get(url=url)
|
||||
except requests.exceptions.ConnectionError:
|
||||
logger.info("connection error occurred")
|
||||
return None
|
||||
if r.status_code == 200:
|
||||
autocomplete = r.json()
|
||||
for song in autocomplete:
|
||||
if artist in song['label'] and "/track" in song['url']:
|
||||
return cls.get_download_link(song['url'])
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_soup_of_search(cls, query: str, trie=0) -> bs4.BeautifulSoup | None:
|
||||
url = f"https://musify.club/search?searchText={query}"
|
||||
logger.debug(f"Trying to get soup from {url}")
|
||||
try:
|
||||
r = session.get(url, timeout=15)
|
||||
except requests.exceptions.Timeout:
|
||||
return None
|
||||
if r.status_code != 200:
|
||||
if r.status_code in [503] and trie < TRIES:
|
||||
logging.warning(f"youtube blocked downloading. ({trie}-{TRIES})")
|
||||
logging.warning(f"retrying in {TIMEOUT} seconds again")
|
||||
time.sleep(TIMEOUT)
|
||||
return cls.get_soup_of_search(query, trie=trie + 1)
|
||||
|
||||
logging.warning("too many tries, returning")
|
||||
return None
|
||||
return bs4.BeautifulSoup(r.content, features="html.parser")
|
||||
|
||||
@classmethod
|
||||
def fetch_source_from_search(cls, title: str, artist: str) -> str | None:
|
||||
query: str = f"{artist[0]} - {title}"
|
||||
search_soup = cls.get_soup_of_search(query=query)
|
||||
if search_soup is None:
|
||||
return None
|
||||
|
||||
# get the soup of the container with all track results
|
||||
tracklist_container_soup = search_soup.find_all("div", {"class": "playlist"})
|
||||
if len(tracklist_container_soup) == 0:
|
||||
return None
|
||||
if len(tracklist_container_soup) != 1:
|
||||
logger.warning("HTML Layout of https://musify.club changed. (or bug)")
|
||||
tracklist_container_soup = tracklist_container_soup[0]
|
||||
|
||||
tracklist_soup = tracklist_container_soup.find_all("div", {"class": "playlist__details"})
|
||||
|
||||
def parse_track_soup(_track_soup):
|
||||
anchor_soups = _track_soup.find_all("a")
|
||||
artist_ = anchor_soups[0].text.strip()
|
||||
track_ = anchor_soups[1].text.strip()
|
||||
url_ = anchor_soups[1]['href']
|
||||
return artist_, track_, url_
|
||||
|
||||
# check each track in the container, if they match
|
||||
for track_soup in tracklist_soup:
|
||||
artist_option, title_option, track_url = parse_track_soup(track_soup)
|
||||
|
||||
title_match, title_distance = phonetic_compares.match_titles(title, title_option)
|
||||
artist_match, artist_distance = phonetic_compares.match_artists(artist, artist_option)
|
||||
|
||||
logging.debug(f"{(title, title_option, title_match, title_distance)}")
|
||||
logging.debug(f"{(artist, artist_option, artist_match, artist_distance)}")
|
||||
|
||||
if not title_match and not artist_match:
|
||||
return cls.get_download_link(track_url)
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def download_from_musify(cls, target: song_objects.Target, url):
|
||||
# returns if target hasn't been set
|
||||
if target.path is None or target.file is None:
|
||||
logger.warning(f"target hasn't been set. Can't download. Most likely a bug.")
|
||||
return False
|
||||
|
||||
# download the audio data
|
||||
logger.info(f"downloading: '{url}'")
|
||||
try:
|
||||
r = session.get(url, timeout=TIMEOUT)
|
||||
except requests.exceptions.ConnectionError:
|
||||
return False
|
||||
except requests.exceptions.ReadTimeout:
|
||||
logger.warning(f"musify server didn't respond after {TIMEOUT} seconds")
|
||||
return False
|
||||
if r.status_code != 200:
|
||||
if r.status_code == 404:
|
||||
logger.warning(f"{r.url} was not found")
|
||||
return False
|
||||
if r.status_code == 503:
|
||||
logger.warning(f"{r.url} raised an internal server error")
|
||||
return False
|
||||
logger.error(f"\"{url}\" returned {r.status_code}: {r.text}")
|
||||
return False
|
||||
|
||||
# write to the file and create folder if it doesn't exist
|
||||
if not os.path.exists(target.path):
|
||||
os.makedirs(target.path, exist_ok=True)
|
||||
with open(target.file, "wb") as mp3_file:
|
||||
mp3_file.write(r.content)
|
||||
logger.info("finished")
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source):
|
||||
super().fetch_audio(song, src)
|
||||
return cls.download_from_musify(song.target, src.url)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
@@ -1,23 +0,0 @@
|
||||
from ...utils.shared import *
|
||||
from typing import Tuple
|
||||
|
||||
from ...database import song as song_objects
|
||||
|
||||
|
||||
logger = URL_DOWNLOAD_LOGGER
|
||||
|
||||
"""
|
||||
The class "Source" is the superclass every class for specific audio
|
||||
sources inherits from. This gives the advantage of a consistent
|
||||
calling of the functions do search for a song and to download it.
|
||||
"""
|
||||
|
||||
|
||||
class AudioSource:
|
||||
@classmethod
|
||||
def fetch_source(cls, row: dict):
|
||||
logger.info(f"try getting source {row.title} from {cls.__name__}")
|
||||
|
||||
@classmethod
|
||||
def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source):
|
||||
logger.info(f"downloading {song}: {cls.__name__} {src.url} -> {song.target.file}")
|
@@ -1,98 +0,0 @@
|
||||
from typing import List
|
||||
|
||||
import youtube_dl
|
||||
import time
|
||||
|
||||
from ...utils.shared import *
|
||||
from ...utils import phonetic_compares
|
||||
from .source import AudioSource
|
||||
|
||||
from ...database import song as song_objects
|
||||
|
||||
|
||||
logger = YOUTUBE_LOGGER
|
||||
|
||||
YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
|
||||
YOUTUBE_URL_KEY = 'webpage_url'
|
||||
YOUTUBE_TITLE_KEY = 'title'
|
||||
WAIT_BETWEEN_BLOCK = 10
|
||||
MAX_TRIES = 3
|
||||
|
||||
def youtube_length_to_mp3_length(youtube_len: float) -> int:
|
||||
return int(youtube_len * 1000)
|
||||
|
||||
|
||||
class Youtube(AudioSource):
|
||||
@classmethod
|
||||
def get_youtube_from_isrc(cls, isrc: str) -> List[dict]:
|
||||
# https://stackoverflow.com/questions/63388364/searching-youtube-videos-using-youtube-dl
|
||||
with youtube_dl.YoutubeDL(YDL_OPTIONS) as ydl:
|
||||
try:
|
||||
videos = ydl.extract_info(f"ytsearch:{isrc}", download=False)['entries']
|
||||
except youtube_dl.utils.DownloadError:
|
||||
return []
|
||||
|
||||
return [{
|
||||
'url': video[YOUTUBE_URL_KEY],
|
||||
'title': video[YOUTUBE_TITLE_KEY],
|
||||
'length': youtube_length_to_mp3_length(float(videos[0]['duration']))
|
||||
} for video in videos]
|
||||
|
||||
@classmethod
|
||||
def fetch_source(cls, song: song_objects.Song):
|
||||
# https://stackoverflow.com/questions/63388364/searching-youtube-videos-using-youtube-dl
|
||||
super().fetch_source(song)
|
||||
|
||||
if not song.has_isrc():
|
||||
return None
|
||||
|
||||
real_title = song.title.lower()
|
||||
|
||||
final_result = None
|
||||
results = cls.get_youtube_from_isrc(song.isrc)
|
||||
for result in results:
|
||||
video_title = result['title'].lower()
|
||||
match, distance = phonetic_compares.match_titles(video_title, real_title)
|
||||
|
||||
if match:
|
||||
continue
|
||||
|
||||
if not phonetic_compares.match_length(song.length, result['length']):
|
||||
logger.warning(f"{song.length} doesn't match with {result}")
|
||||
continue
|
||||
|
||||
final_result = result
|
||||
|
||||
if final_result is None:
|
||||
return None
|
||||
logger.info(f"found video {final_result}")
|
||||
return final_result['url']
|
||||
|
||||
@classmethod
|
||||
def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source, trie: int=0):
|
||||
super().fetch_audio(song, src)
|
||||
if song.target.file is None or song.target.path is None:
|
||||
logger.warning(f"target hasn't been set. Can't download. Most likely a bug.")
|
||||
return False
|
||||
|
||||
options = {
|
||||
'format': 'bestaudio/best',
|
||||
'keepvideo': False,
|
||||
'outtmpl': song.target.file
|
||||
}
|
||||
|
||||
# downloading
|
||||
try:
|
||||
with youtube_dl.YoutubeDL(options) as ydl:
|
||||
ydl.download([src.url])
|
||||
|
||||
except youtube_dl.utils.DownloadError:
|
||||
# retry when failing
|
||||
logger.warning(f"youtube blocked downloading. ({trie}-{MAX_TRIES})")
|
||||
if trie >= MAX_TRIES:
|
||||
logger.warning("too many tries, returning")
|
||||
return False
|
||||
logger.warning(f"retrying in {WAIT_BETWEEN_BLOCK} seconds again")
|
||||
time.sleep(WAIT_BETWEEN_BLOCK)
|
||||
return cls.fetch_audio(song, src, trie=trie + 1)
|
||||
|
@@ -1,3 +1,5 @@
|
||||
from .encyclopaedia_metallum import EncyclopaediaMetallum
|
||||
from .musify import Musify
|
||||
from .youtube import YouTube
|
||||
|
||||
from .abstract import Page, INDEPENDENT_DB_OBJECTS
|
||||
|
@@ -5,7 +5,6 @@ from typing import List, Optional, Type, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pycountry
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from ..connection import Connection
|
||||
@@ -20,7 +19,6 @@ from ..objects import (
|
||||
ID3Timestamp,
|
||||
FormattedText,
|
||||
Label,
|
||||
Options,
|
||||
Target,
|
||||
DatabaseObject
|
||||
)
|
||||
|
@@ -12,8 +12,10 @@ from ..objects import (
|
||||
Song,
|
||||
Album,
|
||||
Label,
|
||||
Target
|
||||
)
|
||||
from ..connection import Connection
|
||||
from ..utils.support_classes import DownloadResult
|
||||
|
||||
class Preset(Page):
|
||||
# CHANGE
|
||||
@@ -57,3 +59,6 @@ class Preset(Page):
|
||||
|
||||
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
|
||||
return Label()
|
||||
|
||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||
return DownloadResult()
|
||||
|
@@ -1,46 +1,72 @@
|
||||
from typing import List
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
import pycountry
|
||||
|
||||
from ..utils.shared import (
|
||||
ENCYCLOPAEDIA_METALLUM_LOGGER as LOGGER
|
||||
)
|
||||
from typing import List, Optional, Type
|
||||
from urllib.parse import urlparse
|
||||
import logging
|
||||
|
||||
from ..objects import Source, DatabaseObject
|
||||
from .abstract import Page
|
||||
from ..database import (
|
||||
MusicObject,
|
||||
from ..objects import (
|
||||
Artist,
|
||||
Source,
|
||||
SourcePages,
|
||||
Song,
|
||||
Album,
|
||||
ID3Timestamp,
|
||||
FormattedText
|
||||
)
|
||||
from ..utils import (
|
||||
string_processing
|
||||
Label,
|
||||
Target
|
||||
)
|
||||
from ..connection import Connection
|
||||
from ..utils.support_classes import DownloadResult
|
||||
from ..utils.shared import YOUTUBE_LOGGER
|
||||
|
||||
INVIDIOUS_INSTANCE = "https://yewtu.be/feed/popular"
|
||||
|
||||
class Youtube(Page):
|
||||
"""
|
||||
The youtube downloader should use https://invidious.io/
|
||||
to make the request.
|
||||
They are an alternative frontend.
|
||||
"""
|
||||
- https://y.com.sb/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
|
||||
- https://y.com.sb/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA
|
||||
- https://y.com.sb/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM
|
||||
"""
|
||||
|
||||
To find an artist filter for chanel and search for
|
||||
`{artist.name} - Topic`
|
||||
and then ofc check for viable results.
|
||||
|
||||
Ofc you can also implement searching songs by isrc.
|
||||
|
||||
NOTE: I didn't look at the invidious api yet. If it sucks,
|
||||
feel free to use projects like youtube-dl.
|
||||
But don't implement you're own youtube client.
|
||||
I don't wanna maintain that shit.
|
||||
"""
|
||||
API_SESSION: requests.Session = requests.Session()
|
||||
|
||||
class YouTube(Page):
|
||||
# CHANGE
|
||||
SOURCE_TYPE = SourcePages.YOUTUBE
|
||||
LOGGER = YOUTUBE_LOGGER
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.connection: Connection = Connection(
|
||||
host="https://www.preset.cum/",
|
||||
logger=self.LOGGER
|
||||
)
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
||||
return super().get_source_type(source)
|
||||
|
||||
def general_search(self, search_query: str) -> List[DatabaseObject]:
|
||||
return [Artist(name="works")]
|
||||
|
||||
def label_search(self, label: Label) -> List[Label]:
|
||||
return []
|
||||
|
||||
def artist_search(self, artist: Artist) -> List[Artist]:
|
||||
return []
|
||||
|
||||
def album_search(self, album: Album) -> List[Album]:
|
||||
return []
|
||||
|
||||
def song_search(self, song: Song) -> List[Song]:
|
||||
return []
|
||||
|
||||
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
|
||||
return Song()
|
||||
|
||||
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
||||
return Album()
|
||||
|
||||
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
|
||||
return Artist()
|
||||
|
||||
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
|
||||
return Label()
|
||||
|
||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||
return DownloadResult()
|
||||
|
Reference in New Issue
Block a user