refactored module names and imports

This commit is contained in:
Lars Noack
2022-11-16 10:15:35 +01:00
parent 65bb48c3cf
commit e9056771ff
17 changed files with 33 additions and 53 deletions

View File

@@ -0,0 +1,88 @@
import mutagen.id3
import requests
import os.path
from mutagen.easyid3 import EasyID3
from pydub import AudioSegment
from ..utils.shared import *
from .sources import (
youtube,
musify,
local_files
)
logger = DOWNLOAD_LOGGER
"""
https://en.wikipedia.org/wiki/ID3
https://mutagen.readthedocs.io/en/latest/user/id3.html
# to get all valid keys
from mutagen.easyid3 import EasyID3
print("\n".join(EasyID3.valid_keys.keys()))
print(EasyID3.valid_keys.keys())
"""
class Download:
def __init__(self):
for row in database.get_tracks_to_download():
row['artist'] = [i['name'] for i in row['artists']]
row['file'] = os.path.join(MUSIC_DIR, row['file'])
row['path'] = os.path.join(MUSIC_DIR, row['path'])
if self.path_stuff(row['path'], row['file']):
self.write_metadata(row, row['file'])
continue
download_success = None
src = row['src']
if src == 'musify':
download_success = musify.download(row)
elif src == 'youtube':
download_success = youtube.download(row)
if download_success == -1:
logger.warning(f"couldn't download {row['url']} from {row['src']}")
continue
self.write_metadata(row, row['file'])
@staticmethod
def write_metadata(row, file_path):
if not os.path.exists(file_path):
logger.warning("something went really wrong")
return False
# only convert the file to the proper format if mutagen doesn't work with it due to time
try:
audiofile = EasyID3(file_path)
except mutagen.id3.ID3NoHeaderError:
AudioSegment.from_file(file_path).export(file_path, format="mp3")
audiofile = EasyID3(file_path)
valid_keys = list(EasyID3.valid_keys.keys())
for key in list(row.keys()):
if key in valid_keys and row[key] is not None:
if type(row[key]) != list:
row[key] = str(row[key])
audiofile[key] = row[key]
logger.info("saving")
audiofile.save(file_path, v1=2)
@staticmethod
def path_stuff(path: str, file_: str):
# returns true if it shouldn't be downloaded
if os.path.exists(file_):
logger.info(f"'{file_}' does already exist, thus not downloading.")
return True
os.makedirs(path, exist_ok=True)
return False
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
s = requests.Session()
Download()

View File

@@ -0,0 +1,58 @@
from ..utils.shared import *
from .sources import (
youtube,
musify,
local_files
)
logger = URL_DOWNLOAD_LOGGER
class Download:
def __init__(self) -> None:
self.urls = []
for row in database.get_tracks_without_src():
row['artists'] = [artist['name'] for artist in row['artists']]
id_ = row['id']
if os.path.exists(os.path.join(MUSIC_DIR, row['file'])):
logger.info(f"skipping the fetching of the download links, cuz {row['file']} already exists.")
continue
"""
not implemented yet, will in one point crashe everything
# check File System
file_path = file_system.get_path(row)
if file_path is not None:
self.add_url(file_path, 'file', id_)
continue
"""
# check YouTube
youtube_url = youtube.get_youtube_url(row)
if youtube_url is not None:
self.add_url(youtube_url, 'youtube', id_)
continue
# check musify
musify_url = musify.get_musify_url(row)
if musify_url is not None:
self.add_url(musify_url, 'musify', id_)
continue
# check musify again, but with a different methode that takes longer
musify_url = musify.get_musify_url_slow(row)
if musify_url is not None:
self.add_url(musify_url, 'musify', id_)
continue
logger.warning(f"Didn't find any sources for {row['title']}")
@staticmethod
def add_url(url: str, src: str, id_: str):
database.set_download_data(id_, url, src)
if __name__ == "__main__":
download = Download()

View File

@@ -0,0 +1,57 @@
import os
from ...utils.shared import *
from ...utils import phonetic_compares
def is_valid(a1, a2, t1, t2) -> bool:
title_match, title_distance = phonetic_compares.match_titles(t1, t2)
artist_match, artist_distance = phonetic_compares.match_artists(a1, a2)
return not title_match and not artist_match
def get_metadata(file):
artist = None
title = None
audiofile = EasyID3(file)
artist = audiofile['artist']
title = audiofile['title']
return artist, title
def check_for_song(folder, artists, title):
if not os.path.exists(folder):
return False
files = [os.path.join(folder, i) for i in os.listdir(folder)]
for file in files:
artists_, title_ = get_metadata(file)
if is_valid(artists, artists_, title, title_):
return True
return False
def get_path(row):
title = row['title']
artists = row['artists']
path_ = os.path.join(MUSIC_DIR, row['path'])
print(artists, title, path_)
check_for_song(path_, artists, title)
return None
if __name__ == "__main__":
row = {'artists': ['Psychonaut 4'], 'id': '6b40186b-6678-4328-a4b8-eb7c9806a9fb', 'tracknumber': None,
'titlesort ': None, 'musicbrainz_releasetrackid': '6b40186b-6678-4328-a4b8-eb7c9806a9fb',
'musicbrainz_albumid': '0d229a02-74f6-4c77-8c20-6612295870ae', 'title': 'Sweet Decadance', 'isrc': None,
'album': 'Neurasthenia', 'copyright': 'Talheim Records', 'album_status': 'Official', 'language': 'eng',
'year': '2016', 'date': '2016-10-07', 'country': 'AT', 'barcode': None, 'albumartist': 'Psychonaut 4',
'albumsort': None, 'musicbrainz_albumtype': 'Album', 'compilation': None,
'album_artist_id': 'c0c720b5-012f-4204-a472-981403f37b12', 'path': 'dsbm/Psychonaut 4/Neurasthenia',
'file': 'dsbm/Psychonaut 4/Neurasthenia/Sweet Decadance.mp3', 'genre': 'dsbm', 'url': None, 'src': None}
print(get_path(row))

View File

@@ -0,0 +1,136 @@
import logging
import time
import requests
import bs4
from ...utils.shared import *
from ...utils import phonetic_compares
TRIES = 5
TIMEOUT = 10
session = requests.Session()
session.headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0",
"Connection": "keep-alive",
"Referer": "https://musify.club/"
}
session.proxies = proxies
def get_musify_url(row):
title = row['title']
artists = row['artists']
url = f"https://musify.club/search/suggestions?term={artists[0]} - {title}"
try:
r = session.get(url=url)
except requests.exceptions.ConnectionError:
return None
if r.status_code == 200:
autocomplete = r.json()
for row in autocomplete:
if any(a in row['label'] for a in artists) and "/track" in row['url']:
return get_download_link(row['url'])
return None
def get_download_link(default_url):
# https://musify.club/track/dl/18567672/rauw-alejandro-te-felicito-feat-shakira.mp3
# /track/sundenklang-wenn-mein-herz-schreit-3883217'
file_ = default_url.split("/")[-1]
musify_id = file_.split("-")[-1]
musify_name = "-".join(file_.split("-")[:-1])
return f"https://musify.club/track/dl/{musify_id}/{musify_name}.mp3"
def download_from_musify(file, url):
logging.info(f"downloading: '{url}'")
try:
r = session.get(url, timeout=15)
except requests.exceptions.ConnectionError or requests.exceptions.ReadTimeout:
return -1
if r.status_code != 200:
if r.status_code == 404:
logging.warning(f"{r.url} was not found")
return -1
if r.status_code == 503:
logging.warning(f"{r.url} raised an internal server error")
return -1
raise ConnectionError(f"\"{url}\" returned {r.status_code}: {r.text}")
with open(file, "wb") as mp3_file:
mp3_file.write(r.content)
logging.info("finished")
def download(row):
url = row['url']
file_ = row['file']
return download_from_musify(file_, url)
def get_soup_of_search(query: str, trie=0):
url = f"https://musify.club/search?searchText={query}"
logging.debug(f"Trying to get soup from {url}")
r = session.get(url)
if r.status_code != 200:
if r.status_code in [503] and trie < TRIES:
logging.warning(f"youtube blocked downloading. ({trie}-{TRIES})")
logging.warning(f"retrying in {TIMEOUT} seconds again")
time.sleep(TIMEOUT)
return get_soup_of_search(query, trie=trie + 1)
logging.warning("too many tries, returning")
raise ConnectionError(f"{r.url} returned {r.status_code}:\n{r.content}")
return bs4.BeautifulSoup(r.content, features="html.parser")
def search_for_track(row):
track = row['title']
artist = row['artists']
soup = get_soup_of_search(f"{artist[0]} - {track}")
tracklist_container_soup = soup.find_all("div", {"class": "playlist"})
if len(tracklist_container_soup) == 0:
return None
if len(tracklist_container_soup) != 1:
raise Exception("Connfusion Error. HTML Layout of https://musify.club changed.")
tracklist_container_soup = tracklist_container_soup[0]
tracklist_soup = tracklist_container_soup.find_all("div", {"class": "playlist__details"})
def parse_track_soup(_track_soup):
anchor_soups = _track_soup.find_all("a")
band_name = anchor_soups[0].text.strip()
title = anchor_soups[1].text.strip()
url_ = anchor_soups[1]['href']
return band_name, title, url_
for track_soup in tracklist_soup:
band_option, title_option, track_url = parse_track_soup(track_soup)
title_match, title_distance = phonetic_compares.match_titles(track, title_option)
band_match, band_distance = phonetic_compares.match_artists(artist, band_option)
logging.debug(f"{(track, title_option, title_match, title_distance)}")
logging.debug(f"{(artist, band_option, band_match, band_distance)}")
if not title_match and not band_match:
return get_download_link(track_url)
return None
def get_musify_url_slow(row):
result = search_for_track(row)
if result is not None:
return result
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,86 @@
from typing import List
import youtube_dl
import logging
import time
from ...utils import phonetic_compares
YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'}
YOUTUBE_URL_KEY = 'webpage_url'
YOUTUBE_TITLE_KEY = 'title'
WAIT_BETWEEN_BLOCK = 10
MAX_TRIES = 3
def get_youtube_from_isrc(isrc: str) -> List[dict]:
# https://stackoverflow.com/questions/63388364/searching-youtube-videos-using-youtube-dl
with youtube_dl.YoutubeDL(YDL_OPTIONS) as ydl:
try:
videos = ydl.extract_info(f"ytsearch:{isrc}", download=False)['entries']
except youtube_dl.utils.DownloadError:
return []
return [{
'url': video[YOUTUBE_URL_KEY],
'title': video[YOUTUBE_TITLE_KEY]
} for video in videos]
def get_youtube_url(row):
if row['isrc'] is None:
return None
real_title = row['title'].lower()
final_result = None
results = get_youtube_from_isrc(row['isrc'])
for result in results:
video_title = result['title'].lower()
match, distance = phonetic_compares.match_titles(video_title, real_title)
if match:
logging.warning(
f"dont downloading {result['url']} cuz the phonetic distance ({distance}) between {real_title} and {video_title} is to high.")
continue
final_result = result
if final_result is None:
return None
return final_result['url']
def download(row, trie: int = 0):
url = row['url']
file_ = row['file']
options = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'keepvideo': False,
'outtmpl': file_
}
try:
with youtube_dl.YoutubeDL(options) as ydl:
ydl.download([url])
except youtube_dl.utils.DownloadError:
logging.warning(f"youtube blocked downloading. ({trie}-{MAX_TRIES})")
if trie >= MAX_TRIES:
logging.warning("too many tries, returning")
logging.warning(f"retrying in {WAIT_BETWEEN_BLOCK} seconds again")
time.sleep(WAIT_BETWEEN_BLOCK)
return download(row, trie=trie+1)
if __name__ == "__main__":
# example isrc that exists on YouTube music
ISRC = "DEUM71500715"
result = get_youtube_from_isrc(ISRC)
print(result)
result = get_youtube_from_isrc("aslhfklasdhfjklasdfjkhasdjlfhlasdjfkuuiueiw")
print(result)