fuck yea refactored lyrics

This commit is contained in:
Hellow 2022-11-25 18:27:48 +01:00
parent 696de60249
commit 693d04800a
9 changed files with 123 additions and 54 deletions

View File

@ -178,7 +178,7 @@ def cli(start_at: int = 0, only_lyrics: bool = False):
if start_at <= 4:
logging.info("starting to fetch the lyrics")
lyrics.fetch_lyrics()
lyrics.fetch_lyrics(cache.get_tracks_for_lyrics())
def gtk_gui():

View File

@ -1,9 +1,7 @@
import music_kraken
# from .audio_source.sources.musify import Musify
from .audio_source.sources.youtube import Youtube
if __name__ == "__main__":
music_kraken.cli()
music_kraken.cli(start_at=4, only_lyrics=True)
# Youtube.fetch_audio({'title': 'dfas', '': '', 'isrc': ''})
# Youtube.fetch_audio({'title': 'dfas', 'url': '', 'file': 'dasf', 'isrc': ''})

View File

@ -5,7 +5,6 @@ from . import (
metadata,
source,
target,
lyrics
)
Song = song.Song
@ -13,6 +12,6 @@ Artist = artist.Artist
Source = source.Source
Target = target.Target
Metadata = metadata.Metadata
Lyrics = lyrics.Lyrics
Lyrics = song.Lyrics
cache = temp_database.TempDatabase()

View File

@ -7,16 +7,17 @@ import requests
from . import song
class Database:
def __init__(self, path_to_db: str, db_structure: str, db_structure_fallback: str, logger: logging.Logger, reset_anyways: bool = False):
self.logger = logger
def __init__(self, path_to_db: str, db_structure: str, db_structure_fallback: str, reset_anyways: bool = False):
self.path_to_db = path_to_db
self.connection = sqlite3.connect(self.path_to_db)
self.cursor = self.connection.cursor()
# init database
self.init_db(database_structure=db_structure, database_structure_fallback=db_structure_fallback, reset_anyways=reset_anyways)
self.init_db(database_structure=db_structure, database_structure_fallback=db_structure_fallback,
reset_anyways=reset_anyways)
def init_db(self, database_structure: str, database_structure_fallback: str, reset_anyways: bool = False):
# check if db exists
@ -29,16 +30,16 @@ class Database:
exists = False
if not exists:
self.logger.info("Database does not exist yet.")
logger.info("Database does not exist yet.")
if reset_anyways or not exists:
# reset the database if reset_anyways is true or if an error has been thrown previously.
self.logger.info("Creating/Reseting Database.")
logger.info("Creating/Reseting Database.")
if not os.path.exists(database_structure):
self.logger.info("database structure file doesn't exist yet, fetching from github")
logger.info("database structure file doesn't exist yet, fetching from github")
r = requests.get(database_structure_fallback)
with open(database_structure, "w") as f:
f.write(r.text)
@ -221,13 +222,13 @@ GROUP BY track.id;
def get_tracks_for_lyrics(self) -> List[song.Song]:
return self.get_custom_track(["track.lyrics IS NULL"])
def add_lyrics(self, track_id: str, lyrics: str):
def add_lyrics(self, song: song.Song, lyrics: song.Lyrics):
query = f"""
UPDATE track
SET lyrics = ?
WHERE '{track_id}' == id;
WHERE '{song.id}' == id;
"""
self.cursor.execute(query, (str(lyrics), ))
self.cursor.execute(query, (str(lyrics.text),))
self.connection.commit()
def update_download_status(self, track_id: str):
@ -237,7 +238,7 @@ WHERE '{track_id}' == id;
def set_field_of_song(self, track_id: str, key: str, value: str):
query = f"UPDATE track SET {key} = ? WHERE '{track_id}' == id;"
self.cursor.execute(query, (value, ))
self.cursor.execute(query, (value,))
self.connection.commit()
def set_download_data(self, track_id: str, url: str, src: str):
@ -249,7 +250,7 @@ WHERE '{track_id}' == id;
"""
self.cursor.execute(query, (url, src))
self.connection.commit()
query = "INSERT OR REPLACE INTO source (track_id, src, url) VALUES (?, ?, ?);"
self.cursor.execute(query, (track_id, src, url))
self.connection.commit()

View File

@ -1,4 +0,0 @@
class Lyrics:
def __init__(self, text: str, language: str) -> None:
self.text = text
self.language = language

View File

@ -5,9 +5,14 @@ from .metadata import Metadata
from .source import Source
from .target import Target
# I don't import cache from the db module because it would lead to circular imports
# from .temp_database import temp_database as cache
# from . import cache
class Song:
def __init__(self, json_response) -> None:
def __init__(self, json_response: dict) -> None:
self.json_data = json_response
# initialize the data
@ -44,6 +49,10 @@ class Song:
self.metadata['artist'] = self.get_artist_names()
# EasyID3.valid_keys.keys()
# the lyrics are not in the metadata class because the field isn't supported
# by easyid3
self.lyrics: LyricsContainer = LyricsContainer(parent=self)
def __str__(self) -> str:
return f"\"{self.title}\" by {', '.join([str(a) for a in self.artists])}"
@ -73,3 +82,36 @@ class Song:
return
self.json_data[item] = value
class Lyrics:
def __init__(self, text: str, language: str) -> None:
self.text = text
self.language = language
class LyricsContainer:
def __init__(self, parent: Song):
self.lyrics_list: List[Lyrics] = []
self.parent = parent
def append(self, lyrics: Lyrics):
# due to my db not supporting multiple Lyrics yet, I just use for doing stuff with the lyrics
# the first element. I know this implementation is junk, but take it or leave it, it is going
# soon anyway
if len(self.lyrics_list) >= 1:
return
self.lyrics_list.append(lyrics)
# unfortunately can't do this here directly, because of circular imports. If anyone
# took the time to get familiar with this codebase... thank you, and if you have any
# suggestion of resolving this, please open an issue.
# cache.add_lyrics(track_id=self.parent.id, lyrics=lyrics.text)
def extend(self, lyrics_list: List[Lyrics]):
for lyrics in lyrics_list:
self.append(lyrics)
is_empty = property(fget=lambda self: len(self.lyrics_list) <= 0)

View File

@ -7,9 +7,10 @@ from ..utils.shared import (
DATABASE_LOGGER
)
class TempDatabase(Database):
def __init__(self) -> None:
super().__init__(TEMP_DATABASE_PATH, DATABASE_STRUCTURE_FILE, DATABASE_STRUCTURE_FALLBACK, DATABASE_LOGGER, False)
super().__init__(TEMP_DATABASE_PATH, DATABASE_STRUCTURE_FILE, DATABASE_STRUCTURE_FALLBACK, False)
temp_database = TempDatabase()

View File

@ -4,7 +4,9 @@ from bs4 import BeautifulSoup
import pycountry
from ..database import (
Lyrics
Lyrics,
Song,
Artist
)
from ..utils.shared import *
from ..utils import phonetic_compares
@ -25,7 +27,7 @@ session.proxies = proxies
logger = GENIUS_LOGGER
class Song:
class LyricsSong:
def __init__(self, raw_data: dict, desirered_data: dict):
self.raw_data = raw_data
self.desired_data = desirered_data
@ -72,7 +74,7 @@ class Song:
r = session.get(self.url)
if r.status_code != 200:
logging.warning(f"{r.url} returned {r.status_code}:\n{r.content}")
logger.warning(f"{r.url} returned {r.status_code}:\n{r.content}")
return None
soup = BeautifulSoup(r.content, "html.parser")
@ -80,23 +82,29 @@ class Song:
if len(lyrics_soups) == 0:
logger.warning(f"didn't found lyrics on {self.url}")
return None
if len(lyrics_soups) != 1:
logger.warning(f"number of lyrics_soups doesn't equals 1, but {len(lyrics_soups)} on {self.url}")
# if len(lyrics_soups) != 1:
# logger.warning(f"number of lyrics_soups doesn't equals 1, but {len(lyrics_soups)} on {self.url}")
lyrics = "\n".join([lyrics_soup.getText(separator="\n", strip=True) for lyrics_soup in lyrics_soups])
print(lyrics)
# <div data-lyrics-container="true" class="Lyrics__Container-sc-1ynbvzw-6 YYrds">With the soundle
self.lyrics = lyrics
return lyrics
def get_lyrics_object(self) -> Lyrics | None:
if self.lyrics is None:
return None
return Lyrics(text=self.lyrics, language=self.lang or "en")
def process_multiple_songs(song_datas: list, desired_data: dict) -> List[Song]:
all_songs = [Song(song_data, desired_data) for song_data in song_datas]
lyrics_object = property(fget=get_lyrics_object)
def process_multiple_songs(song_datas: list, desired_data: dict) -> List[LyricsSong]:
all_songs = [LyricsSong(song_data, desired_data) for song_data in song_datas]
return all_songs
def search_song_list(artist: str, track: str) -> List[Song]:
def search_song_list(artist: str, track: str) -> List[LyricsSong]:
endpoint = "https://genius.com/api/search/multi?q="
url = f"{endpoint}{artist} - {track}"
logging.info(f"requesting {url}")
@ -124,18 +132,31 @@ def search_song_list(artist: str, track: str) -> List[Song]:
return []
def search(artist: str, track: str) -> list:
results = []
r = search_song_list(artist, track)
for r_ in r:
if r_.valid:
results.append(r_)
return results
def fetch_lyrics_from_artist(song: Song, artist: Artist) -> List[Lyrics]:
lyrics_list: List[Lyrics] = []
lyrics_song_list = search_song_list(artist.name, song.title)
for lyrics_song in lyrics_song_list:
if lyrics_song.valid:
lyrics_list.append(lyrics_song.lyrics_object)
return lyrics_list
def fetch_lyrics(song: Song) -> List[Lyrics]:
lyrics: List[Lyrics] = []
for artist in song.artists:
lyrics.extend(fetch_lyrics_from_artist(song, artist))
return lyrics
"""
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
songs = search("Zombiez", "WALL OF Z")
for song in songs:
print(song)
"""

View File

@ -1,3 +1,5 @@
from typing import List
import mutagen
from mutagen.id3 import ID3, USLT
@ -5,7 +7,9 @@ from ..utils.shared import *
from . import genius
from ..database import (
Song,
cache
cache,
Lyrics,
Target
)
logger = LYRICS_LOGGER
@ -34,36 +38,43 @@ I have written that Rhythmbox plugin: https://github.com/HeIIow2/rythmbox-id3-ly
# https://code.activestate.com/recipes/577138-embed-lyrics-into-mp3-files-using-mutagen-uslt-tag/
def add_lyrics(file_path: str, lyrics):
if not os.path.exists(file_path):
def add_lyrics(target: Target, lyrics):
if not os.path.exists(target.file):
return
try:
tags = ID3(file_path)
tags = ID3(target.file)
except mutagen.id3.ID3NoHeaderError:
return
logger.info(f"adding lyrics to the file {file_path}")
logger.info(f"adding lyrics to the file {target.file}")
uslt_output = USLT(encoding=3, lang=lyrics.lang, desc=u'desc', text=lyrics.lyrics)
uslt_output = USLT(encoding=3, lang=lyrics.language, desc=u'desc', text=lyrics.text)
tags["USLT::'eng'"] = uslt_output
tags.save(file_path)
tags.save(target.file)
def fetch_single_lyrics(song: Song):
logger.info(f"try fetching lyrics for {song}")
lyrics = []
lyrics_list: List[Lyrics] = genius.fetch_lyrics(song)
"""
for artist in song.get_artist_names():
lyrics.extend(genius.search(artist, song.title))
if len(lyrics) == 0:
"""
if len(lyrics_list) == 0:
return
logger.info("found lyrics")
cache.add_lyrics(song.id, lyrics=lyrics[0])
add_lyrics(song.target.file, lyrics[0])
logger.info(f"found lyrics for {song}")
song.lyrics.extend(lyrics_list)
print(lyrics_list)
cache.add_lyrics(song=song, lyrics=lyrics_list[0])
add_lyrics(song.target, lyrics_list[0])
# cache.add_lyrics(song.id, lyrics=lyrics[0])
# add_lyrics(song.target.file, lyrics[0])
def fetch_lyrics():
for song in cache.get_tracks_for_lyrics():
def fetch_lyrics(songs: List[Song]):
for song in songs:
fetch_single_lyrics(song)