continued

This commit is contained in:
lars 2022-11-17 13:23:27 +01:00
parent 0ccb3f244c
commit 5084534cb8
7 changed files with 175 additions and 48 deletions

View File

@ -26,16 +26,6 @@ from .lyrics import lyrics
import logging import logging
import os import os
# configure logger default
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(os.path.join(temp_dir, LOG_FILE)),
logging.StreamHandler()
]
)
def get_existing_genre(): def get_existing_genre():
valid_directories = [] valid_directories = []

View File

@ -11,6 +11,8 @@ from .source import AudioSource
TRIES = 5 TRIES = 5
TIMEOUT = 10 TIMEOUT = 10
logger = MUSIFY_LOGGER
session = requests.Session() session = requests.Session()
session.headers = { session.headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0", "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0",
@ -22,23 +24,111 @@ session.proxies = proxies
class Musify(AudioSource): class Musify(AudioSource):
@classmethod @classmethod
def fetch_source(cls, row: dict): def fetch_source(cls, row: dict) -> str | None:
super().fetch_source(row) super().fetch_source(row)
title = row['title'] title = row['title']
artists = row['artists'] artists = row['artists']
url = f"https://musify.club/search/suggestions?term={artists[0]} - {title}" # trying to get a download link via the autocomplete api
for artist in artists:
url = cls.fetch_source_from_autocomplete(title=title, artist=artist)
if url is not None:
logger.info(f"found download link {url}")
return url
# trying to get a download link via the html of the direct search page
for artist in artists:
url = cls.fetch_source_from_search(title=title, artist=artist)
if url is not None:
logger.info(f"found download link {url}")
return url
logger.warning(f"Didn't find the audio on {cls.__name__}")
@classmethod
def get_download_link(cls, track_url: str) -> str | None:
# https://musify.club/track/dl/18567672/rauw-alejandro-te-felicito-feat-shakira.mp3
# /track/sundenklang-wenn-mein-herz-schreit-3883217'
file_ = track_url.split("/")[-1]
if len(file_) == 0:
return None
musify_id = file_.split("-")[-1]
musify_name = "-".join(file_.split("-")[:-1])
return f"https://musify.club/track/dl/{musify_id}/{musify_name}.mp3"
@classmethod
def fetch_source_from_autocomplete(cls, title: str, artist: str) -> str | None:
url = f"https://musify.club/search/suggestions?term={artist} - {title}"
try: try:
logger.info(f"calling {url}")
r = session.get(url=url) r = session.get(url=url)
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
logger.info("connection error occurred")
return None return None
if r.status_code == 200: if r.status_code == 200:
autocomplete = r.json() autocomplete = r.json()
for row in autocomplete: for row in autocomplete:
if any(a in row['label'] for a in artists) and "/track" in row['url']: if artist in row['label'] and "/track" in row['url']:
return get_download_link(row['url']) return cls.get_download_link(row['url'])
return None
@classmethod
def get_soup_of_search(cls, query: str, trie=0) -> bs4.BeautifulSoup | None:
url = f"https://musify.club/search?searchText={query}"
logger.debug(f"Trying to get soup from {url}")
r = session.get(url)
if r.status_code != 200:
if r.status_code in [503] and trie < TRIES:
logging.warning(f"youtube blocked downloading. ({trie}-{TRIES})")
logging.warning(f"retrying in {TIMEOUT} seconds again")
time.sleep(TIMEOUT)
return get_soup_of_search(query, trie=trie + 1)
logging.warning("too many tries, returning")
return None
return bs4.BeautifulSoup(r.content, features="html.parser")
@classmethod
def fetch_source_from_search(cls, title: str, artist: str) -> str | None:
query: str = f"{artist[0]} - {title}"
search_soup = cls.get_soup_of_search(query=query)
if search_soup is None:
return None
# get the soup of the container with all track results
tracklist_container_soup = search_soup.find_all("div", {"class": "playlist"})
if len(tracklist_container_soup) == 0:
return None
if len(tracklist_container_soup) != 1:
logger.warning("HTML Layout of https://musify.club changed. (or bug)")
tracklist_container_soup = tracklist_container_soup[0]
tracklist_soup = tracklist_container_soup.find_all("div", {"class": "playlist__details"})
def parse_track_soup(_track_soup):
anchor_soups = _track_soup.find_all("a")
artist_ = anchor_soups[0].text.strip()
track_ = anchor_soups[1].text.strip()
url_ = anchor_soups[1]['href']
return artist_, track_, url_
# check each track in the container, if they match
for track_soup in tracklist_soup:
artist_option, title_option, track_url = parse_track_soup(track_soup)
title_match, title_distance = phonetic_compares.match_titles(title, title_option)
artist_match, artist_distance = phonetic_compares.match_artists(artist, artist_option)
logging.debug(f"{(title, title_option, title_match, title_distance)}")
logging.debug(f"{(artist, artist_option, artist_match, artist_distance)}")
if not title_match and not artist_match:
return cls.get_download_link(track_url)
return None return None
@ -51,7 +141,6 @@ class Musify(AudioSource):
return download_from_musify(file_, url) return download_from_musify(file_, url)
def get_musify_url(row): def get_musify_url(row):
title = row['title'] title = row['title']
artists = row['artists'] artists = row['artists']

View File

@ -1,22 +0,0 @@
import jellyfish
TITLE_THRESHOLD_LEVENSHTEIN = 2
def match_titles(title_1: str, title_2: str) -> (bool, int):
distance = jellyfish.levenshtein_distance(title_1, title_2)
return distance > TITLE_THRESHOLD_LEVENSHTEIN, distance
def match_artists(artist_1, artist_2: str) -> (bool, int):
if type(artist_1) == list:
distances = []
for artist_1_ in artist_1:
match, distance = match_titles(artist_1_, artist_2)
if not match:
return match, distance
distances.append(distance)
return True, min(distances)
return match_titles(artist_1, artist_2)

View File

@ -22,7 +22,9 @@ class UrlPath:
self.genre = genre self.genre = genre
for row in database.get_tracks_without_filepath(): for row in database.get_tracks_without_filepath():
print(row)
file, path = self.get_path_from_row(row) file, path = self.get_path_from_row(row)
logger.info(f"setting target to {file}")
database.set_filepath(row['id'], file, path, genre) database.set_filepath(row['id'], file, path, genre)
def get_path_from_row(self, row): def get_path_from_row(self, row):

View File

@ -132,7 +132,7 @@ class Database:
"release_group.id == release_.release_group_id", "release_group.id == release_.release_group_id",
"artist_track.artist_id == artist.id", "artist_track.artist_id == artist.id",
"artist_track.track_id == track.id", "artist_track.track_id == track.id",
"source.track_id == track.id" "(track.id == source.track_id OR track.id NOT IN (SELECT track_id FROM source))"
] ]
where_args.extend(custom_where) where_args.extend(custom_where)
@ -274,3 +274,61 @@ if __name__ == "__main__":
database = Database(os.path.join(temp_dir, "metadata.db"), os.path.join(temp_dir, "database_structure.sql"), logger, database = Database(os.path.join(temp_dir, "metadata.db"), os.path.join(temp_dir, "database_structure.sql"), logger,
reset_anyways=True) reset_anyways=True)
"""
SELECT DISTINCT
json_object(
'artists', json_group_array(
(
SELECT DISTINCT json_object(
'id', artist.id,
'name', artist.name
)
)
),
'source', json_group_array(
(
SELECT json_object(
'src_', src_.src,
'url', src_.url,
'valid', src_.valid
)
)
),
'id', track.id,
'tracknumber', track.tracknumber,
'titlesort ', track.tracknumber,
'musicbrainz_releasetrackid', track.id,
'musicbrainz_albumid', release_.id,
'title', track.track,
'isrc', track.isrc,
'album', release_.title,
'copyright', release_.copyright,
'album_status', release_.album_status,
'language', release_.language,
'year', release_.year,
'date', release_.date,
'country', release_.country,
'barcode', release_.barcode,
'albumartist', release_group.albumartist,
'albumsort', release_group.albumsort,
'musicbrainz_albumtype', release_group.musicbrainz_albumtype,
'compilation', release_group.compilation,
'album_artist_id', release_group.album_artist_id,
'path', track.path,
'file', track.file,
'genre', track.genre,
'url', track.url,
'src', track.src,
'lyrics', track.lyrics
)
FROM track, release_, release_group, artist, artist_track
LEFT JOIN release_ id ON track.release_id = release_.id
LEFT JOIN release_group id ON release_.id = release_group.id
LEFT JOIN artist_track track_id ON track.id = artist_track.track_id
LEFT JOIN artist id ON artist_track.artist_id = artist.id
LEFT JOIN source src_ ON track.id = src_.track_id
GROUP BY track.id;
"""

View File

@ -10,33 +10,43 @@ LOG_FILE = "download_logs.log"
DATABASE_FILE = "metadata.db" DATABASE_FILE = "metadata.db"
DATABASE_STRUCTURE_FILE = "database_structure.sql" DATABASE_STRUCTURE_FILE = "database_structure.sql"
DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/master/assets/database_structure.sql" DATABASE_STRUCTURE_FALLBACK = "https://raw.githubusercontent.com/HeIIow2/music-downloader/master/assets/database_structure.sql"
temp_dir = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
# configure logger default
logging.basicConfig(
level=logging.INFO,
format=logging.BASIC_FORMAT,
handlers=[
logging.FileHandler(os.path.join(temp_dir, LOG_FILE)),
logging.StreamHandler()
]
)
SEARCH_LOGGER = logging.getLogger("mb-cli") SEARCH_LOGGER = logging.getLogger("mb-cli")
DATABASE_LOGGER = logging.getLogger("database") DATABASE_LOGGER = logging.getLogger("database")
METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata-download") METADATA_DOWNLOAD_LOGGER = logging.getLogger("metadata")
URL_DOWNLOAD_LOGGER = logging.getLogger("AudioSource") URL_DOWNLOAD_LOGGER = logging.getLogger("AudioSource")
YOUTUBE_LOGGER = logging.getLogger("Youtube") YOUTUBE_LOGGER = logging.getLogger("Youtube")
MUSIFY_LOGGER = logging.getLogger("Musify")
PATH_LOGGER = logging.getLogger("create-paths") PATH_LOGGER = logging.getLogger("create-paths")
DOWNLOAD_LOGGER = logging.getLogger("download") DOWNLOAD_LOGGER = logging.getLogger("download")
LYRICS_LOGGER = logging.getLogger("lyrics") LYRICS_LOGGER = logging.getLogger("lyrics")
GENIUS_LOGGER = logging.getLogger("genius") GENIUS_LOGGER = logging.getLogger("genius")
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.expanduser('~/Music')
temp_dir = os.path.join(tempfile.gettempdir(), TEMP_FOLDER)
if not os.path.exists(temp_dir):
os.mkdir(temp_dir)
logging.getLogger("musicbrainzngs").setLevel(logging.WARNING) logging.getLogger("musicbrainzngs").setLevel(logging.WARNING)
musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader") musicbrainzngs.set_useragent("metadata receiver", "0.1", "https://github.com/HeIIow2/music-downloader")
NOT_A_GENRE = ".", "..", "misc_scripts", "Music", "script", ".git", ".idea"
MUSIC_DIR = os.path.expanduser('~/Music')
database = Database(os.path.join(temp_dir, DATABASE_FILE), database = Database(os.path.join(temp_dir, DATABASE_FILE),
os.path.join(temp_dir, DATABASE_STRUCTURE_FILE), os.path.join(temp_dir, DATABASE_STRUCTURE_FILE),
DATABASE_STRUCTURE_FALLBACK, DATABASE_STRUCTURE_FALLBACK,
DATABASE_LOGGER, DATABASE_LOGGER,
reset_anyways=True) reset_anyways=False)
TOR = False TOR = False
@ -46,4 +56,4 @@ proxies = {
} if TOR else {} } if TOR else {}
# only the sources here will get downloaded, in the order the list is ordered # only the sources here will get downloaded, in the order the list is ordered
AUDIO_SOURCES = ["Youtube", "Musify"] AUDIO_SOURCES = ["Musify", "Youtube"]