diff --git a/README.md b/README.md index a82d5ff..8bccd6d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Music Kraken - +music kraken logo - [Music Kraken](#music-kraken) - [Installation](#installation) @@ -52,9 +52,19 @@ If you choose to run it in WSL, make sure ` ~/.local/bin` is added to your `$PAT ## Quick-Guide -**Genre:** First, the cli asks you to input a genre you want to download to. The options it gives you (if it gives you any) are all the folders you have in the music directory. You can also just input a new one. +The **Genre** you define at the start is the folder, my programm will download the files into, AS WELL as the value of the ID3 genre field. -**What to download:** After that it prompts you for a search. Here are a couple examples how you can search: +When it drops you into the **shell** 2 main things are important: + +1. You search with `s: ` +2. You choose an option with just the index number of the option +3. You download with `d: `, where the options are comma seperated + +Trust me it WILL make sense, once you see it. + +### Query + +The syntax for the query is like really simple. ``` > #a @@ -67,9 +77,9 @@ searches for the release (album) by the artist searches for the track from the release ``` -After searching with this syntax, it prompts you with multiple results. You can either choose one of those by inputing its id `int`, or you can search for a new query. +For a more detailed guid of the downloading shell, see [here](documentation/shell.md). -After you chose either an artist, a release group, a release, or a track by its id, download it by inputting the string `ok`. My downloader will download it automatically for you. +LOVE YALL *(except nazis ;-;)* --- diff --git a/contribute.md b/contribute.md index 1750781..34ba719 100644 --- a/contribute.md +++ b/contribute.md @@ -1,3 +1,5 @@ +> IMPORTANT NOTE: heavily outdated sorryyyyy + # How to contribute I am always happy about pull requests. diff --git a/src/music_kraken/download/page_attributes.py b/src/music_kraken/download/page_attributes.py index 11598d0..ee7c18d 100644 --- a/src/music_kraken/download/page_attributes.py +++ b/src/music_kraken/download/page_attributes.py @@ -5,15 +5,17 @@ from ..objects import DatabaseObject, Source from ..utils.enums.source import SourcePages from ..utils.support_classes import Query, DownloadResult from ..utils.exception.download import UrlNotFoundException -from ..pages import Page, EncyclopaediaMetallum, Musify, INDEPENDENT_DB_OBJECTS +from ..pages import Page, EncyclopaediaMetallum, Musify, YouTube, INDEPENDENT_DB_OBJECTS ALL_PAGES: Set[Type[Page]] = { EncyclopaediaMetallum, - Musify + Musify, + YouTube, } AUDIO_PAGES: Set[Type[Page]] = { Musify, + YouTube, } SHADY_PAGES: Set[Type[Page]] = { diff --git a/src/music_kraken/not_used_anymore/__init__.py b/src/music_kraken/not_used_anymore/__init__.py deleted file mode 100644 index 139597f..0000000 --- a/src/music_kraken/not_used_anymore/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ - - diff --git a/src/music_kraken/not_used_anymore/fetch_audio.py b/src/music_kraken/not_used_anymore/fetch_audio.py deleted file mode 100644 index 1793f12..0000000 --- a/src/music_kraken/not_used_anymore/fetch_audio.py +++ /dev/null @@ -1,106 +0,0 @@ -from typing import List -import mutagen.id3 -import requests -import os.path -from mutagen.easyid3 import EasyID3 -from pydub import AudioSegment - -from ..utils.shared import * -from .sources import ( - youtube, - musify, - local_files -) -from ..database.song import ( - Song as song_object, - Target as target_object, - Source as source_object -) -from ..database.temp_database import temp_database - -logger = DOWNLOAD_LOGGER - -# maps the classes to get data from to the source name -sources = { - 'Youtube': youtube.Youtube, - 'Musify': musify.Musify -} - -""" -https://en.wikipedia.org/wiki/ID3 -https://mutagen.readthedocs.io/en/latest/user/id3.html - -# to get all valid keys -from mutagen.easyid3 import EasyID3 -print("\n".join(EasyID3.valid_keys.keys())) -print(EasyID3.valid_keys.keys()) -""" - - -class Download: - def __init__(self): - Download.fetch_audios(temp_database.get_tracks_to_download()) - - @classmethod - def fetch_audios(cls, songs: List[song_object], override_existing: bool = False): - for song in songs: - if not cls.path_stuff(song.target) and not override_existing: - cls.write_metadata(song) - continue - - is_downloaded = False - for source in song.sources: - download_success = Download.download_from_src(song, source) - - if download_success == -1: - logger.warning(f"couldn't download {song['url']} from {song['src']}") - else: - is_downloaded = True - break - - if is_downloaded: - cls.write_metadata(song) - - @classmethod - def download_from_src(cls, song: song_object, source: source_object): - if source.src not in sources: - raise ValueError(f"source {source.src} seems to not exist") - source_subclass = sources[source.src] - - return source_subclass.fetch_audio(song, source) - - @classmethod - def write_metadata(cls, song: song_object): - if not os.path.exists(song.target.file): - logger.warning(f"file {song.target.file} doesn't exist") - return False - - # only convert the file to the proper format if mutagen doesn't work with it due to time - try: - audiofile = EasyID3(song.target.file) - except mutagen.id3.ID3NoHeaderError: - AudioSegment.from_file(song.target.file).export(song.target.file, format="mp3") - audiofile = EasyID3(song.target.file) - - for key, value in song.get_metadata(): - if type(value) != list: - value = str(value) - audiofile[key] = value - - logger.info("saving") - audiofile.save(song.target.file, v1=2) - - @classmethod - def path_stuff(cls, target: target_object) -> bool: - # returns true if it should be downloaded - if os.path.exists(target.file): - logger.info(f"'{target.file}' does already exist, thus not downloading.") - return False - os.makedirs(target.path, exist_ok=True) - return True - - -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - s = requests.Session() - Download() diff --git a/src/music_kraken/not_used_anymore/fetch_source.py b/src/music_kraken/not_used_anymore/fetch_source.py deleted file mode 100644 index e28479c..0000000 --- a/src/music_kraken/not_used_anymore/fetch_source.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import List - -from ..utils.shared import * -from .sources import ( - youtube, - musify, - local_files -) -from ..database.song import Song as song_object -from ..database.temp_database import temp_database - -logger = URL_DOWNLOAD_LOGGER - -# maps the classes to get data from to the source name -sources = { - 'Youtube': youtube.Youtube, - 'Musify': musify.Musify -} - - -class Download: - def __init__(self) -> None: - for song in temp_database.get_tracks_without_src(): - id_ = song['id'] - if os.path.exists(song.target.file): - logger.info(f"skipping the fetching of the download links, cuz {song.target.file} already exists.") - continue - - success = False - for src in AUDIO_SOURCES: - res = Download.fetch_from_src(song, src) - if res is not None: - success = True - Download.add_url(res, src, id_) - - if not success: - logger.warning(f"Didn't find any sources for {song}") - - @classmethod - def fetch_sources(cls, songs: List[song_object], skip_existing_files: bool = False): - for song in songs: - if song.target.exists_on_disc and skip_existing_files: - logger.info(f"skipping the fetching of the download links, cuz {song.target.file} already exists.") - continue - - success = False - for src in AUDIO_SOURCES: - res = cls.fetch_from_src(song, src) - if res is not None: - success = True - cls.add_url(res, src, song.id) - - if not success: - logger.warning(f"Didn't find any sources for {song}") - - @classmethod - def fetch_from_src(cls, song, src): - if src not in sources: - raise ValueError(f"source {src} seems to not exist") - - source_subclass = sources[src] - return source_subclass.fetch_source(song) - - @classmethod - def add_url(cls, url: str, src: str, id_: str): - temp_database.set_download_data(id_, url, src) - - -if __name__ == "__main__": - download = Download() diff --git a/src/music_kraken/not_used_anymore/metadata/__init__.py b/src/music_kraken/not_used_anymore/metadata/__init__.py deleted file mode 100644 index ec153f9..0000000 --- a/src/music_kraken/not_used_anymore/metadata/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from . import ( - metadata_search, - metadata_fetch -) - -MetadataSearch = metadata_search.Search -MetadataDownload = metadata_fetch.MetadataDownloader diff --git a/src/music_kraken/not_used_anymore/metadata/metadata_fetch.py b/src/music_kraken/not_used_anymore/metadata/metadata_fetch.py deleted file mode 100644 index 7f71c86..0000000 --- a/src/music_kraken/not_used_anymore/metadata/metadata_fetch.py +++ /dev/null @@ -1,345 +0,0 @@ -from src.music_kraken.utils.shared import * -from src.music_kraken.utils.object_handeling import get_elem_from_obj, parse_music_brainz_date - -from src.music_kraken.database.temp_database import temp_database - -from typing import List -import musicbrainzngs -import logging - -# I don't know if it would be feesable to set up my own mb instance -# https://github.com/metabrainz/musicbrainz-docker - - -# IMPORTANT DOCUMENTATION WHICH CONTAINS FOR EXAMPLE THE INCLUDES -# https://python-musicbrainzngs.readthedocs.io/en/v0.7.1/api/#getting-data - -logger = METADATA_DOWNLOAD_LOGGER - - -class MetadataDownloader: - def __init__(self): - pass - - class Artist: - def __init__( - self, - musicbrainz_artistid: str, - release_groups: List = [], - new_release_groups: bool = True - ): - """ - release_groups: list - """ - self.release_groups = release_groups - - self.musicbrainz_artistid = musicbrainz_artistid - - try: - result = musicbrainzngs.get_artist_by_id(self.musicbrainz_artistid, includes=["release-groups", "releases"]) - except musicbrainzngs.musicbrainz.NetworkError: - return - artist_data = get_elem_from_obj(result, ['artist'], return_if_none={}) - - self.artist = get_elem_from_obj(artist_data, ['name']) - - self.save() - - # STARTING TO FETCH' RELEASE GROUPS. IMPORTANT: DON'T WRITE ANYTHING BESIDES THAT HERE - if not new_release_groups: - return - # sort all release groups by date and add album sort to have them in chronological order. - release_groups = artist_data['release-group-list'] - for i, release_group in enumerate(release_groups): - release_groups[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date']) - release_groups.sort(key=lambda x: x['first-release-date']) - - for i, release_group in enumerate(release_groups): - self.release_groups.append(MetadataDownloader.ReleaseGroup( - musicbrainz_releasegroupid=release_group['id'], - artists=[self], - albumsort=i + 1 - )) - - def __str__(self): - newline = "\n" - return f"artist: \"{self.artist}\"" - - def save(self): - logger.info(f"caching {self}") - temp_database.add_artist( - musicbrainz_artistid=self.musicbrainz_artistid, - artist=self.artist - ) - - class ReleaseGroup: - def __init__( - self, - musicbrainz_releasegroupid: str, - artists=[], - albumsort: int = None, - only_download_distinct_releases: bool = True, - fetch_further: bool = True - ): - """ - split_artists: list -> if len > 1: album_artist=VariousArtists - releases: list - """ - - self.musicbrainz_releasegroupid = musicbrainz_releasegroupid - self.artists = artists - self.releases = [] - - try: - result = musicbrainzngs.get_release_group_by_id(musicbrainz_releasegroupid, - includes=["artist-credits", "releases"]) - except musicbrainzngs.musicbrainz.NetworkError: - return - release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={}) - artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={}) - release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={}) - - # only for printing the release - self.name = get_elem_from_obj(release_group_data, ['title']) - - for artist_data in artist_datas: - artist_id = get_elem_from_obj(artist_data, ['artist', 'id']) - if artist_id is None: - continue - self.append_artist(artist_id) - self.albumartist = "Various Artists" if len(self.artists) > 1 else self.artists[0].artist - self.album_artist_id = None if self.albumartist == "Various Artists" else self.artists[ - 0].musicbrainz_artistid - - self.albumsort = albumsort - self.musicbrainz_albumtype = get_elem_from_obj(release_group_data, ['primary-type']) - self.compilation = "1" if self.musicbrainz_albumtype == "Compilation" else None - - self.save() - - if not fetch_further: - return - - if only_download_distinct_releases: - self.append_distinct_releases(release_datas) - else: - self.append_all_releases(release_datas) - - def __str__(self): - return f"release group: \"{self.name}\"" - - def save(self): - logger.info(f"caching {self}") - temp_database.add_release_group( - musicbrainz_releasegroupid=self.musicbrainz_releasegroupid, - artist_ids=[artist.musicbrainz_artistid for artist in self.artists], - albumartist=self.albumartist, - albumsort=self.albumsort, - musicbrainz_albumtype=self.musicbrainz_albumtype, - compilation=self.compilation, - album_artist_id=self.album_artist_id - ) - - def append_artist(self, artist_id: str): - for existing_artist in self.artists: - if artist_id == existing_artist.musicbrainz_artistid: - return existing_artist - new_artist = MetadataDownloader.Artist(artist_id, release_groups=[self], - new_release_groups=False) - self.artists.append(new_artist) - return new_artist - - def append_release(self, release_data: dict): - musicbrainz_albumid = get_elem_from_obj(release_data, ['id']) - if musicbrainz_albumid is None: - return - self.releases.append( - MetadataDownloader.Release(musicbrainz_albumid, release_group=self)) - - def append_distinct_releases(self, release_datas: List[dict]): - titles = {} - - for release_data in release_datas: - title = get_elem_from_obj(release_data, ['title']) - if title is None: - continue - titles[title] = release_data - - for key in titles: - self.append_release(titles[key]) - - def append_all_releases(self, release_datas: List[dict]): - for release_data in release_datas: - self.append_release(release_data) - - class Release: - def __init__( - self, - musicbrainz_albumid: str, - release_group=None, - fetch_furter: bool = True - ): - """ - release_group: ReleaseGroup - tracks: list - """ - self.musicbrainz_albumid = musicbrainz_albumid - self.release_group = release_group - self.tracklist = [] - - try: - result = musicbrainzngs.get_release_by_id(self.musicbrainz_albumid, - includes=["recordings", "labels", "release-groups"]) - except musicbrainzngs.musicbrainz.NetworkError: - return - release_data = get_elem_from_obj(result, ['release'], return_if_none={}) - label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={}) - recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[]) - release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={}) - if self.release_group is None: - self.release_group = MetadataDownloader.ReleaseGroup( - musicbrainz_releasegroupid=get_elem_from_obj( - release_group_data, ['id']), - fetch_further=False) - - self.title = get_elem_from_obj(release_data, ['title']) - self.copyright = get_elem_from_obj(label_data, [0, 'label', 'name']) - - self.album_status = get_elem_from_obj(release_data, ['status']) - self.language = get_elem_from_obj(release_data, ['text-representation', 'language']) - self.year = get_elem_from_obj(release_data, ['date'], lambda x: x.split("-")[0]) - self.date = get_elem_from_obj(release_data, ['date']) - self.country = get_elem_from_obj(release_data, ['country']) - self.barcode = get_elem_from_obj(release_data, ['barcode']) - - self.save() - if fetch_furter: - self.append_recordings(recording_datas) - - def __str__(self): - return f"release: {self.title} ©{self.copyright} {self.album_status}" - - def save(self): - logger.info(f"caching {self}") - temp_database.add_release( - musicbrainz_albumid=self.musicbrainz_albumid, - release_group_id=self.release_group.musicbrainz_releasegroupid, - title=self.title, - copyright_=self.copyright, - album_status=self.album_status, - language=self.language, - year=self.year, - date=self.date, - country=self.country, - barcode=self.barcode - ) - - def append_recordings(self, recording_datas: dict): - for i, recording_data in enumerate(recording_datas): - musicbrainz_releasetrackid = get_elem_from_obj(recording_data, ['recording', 'id']) - if musicbrainz_releasetrackid is None: - continue - - self.tracklist.append( - MetadataDownloader.Track(musicbrainz_releasetrackid, self, - track_number=str(i + 1))) - - class Track: - def __init__( - self, - musicbrainz_releasetrackid: str, - release=None, - track_number: str = None - ): - """ - release: Release - feature_artists: list - """ - - self.musicbrainz_releasetrackid = musicbrainz_releasetrackid - self.release = release - self.artists = [] - - self.track_number = track_number - - try: - result = musicbrainzngs.get_recording_by_id(self.musicbrainz_releasetrackid, - includes=["artists", "releases", "recording-rels", "isrcs", - "work-level-rels"]) - except musicbrainzngs.musicbrainz.NetworkError: - return - recording_data = result['recording'] - release_data = get_elem_from_obj(recording_data, ['release-list', -1]) - if self.release is None: - self.release = MetadataDownloader.Release(get_elem_from_obj(release_data, ['id']), fetch_furter=False) - - for artist_data in get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]): - self.append_artist(get_elem_from_obj(artist_data, ['artist', 'id'])) - - self.isrc = get_elem_from_obj(recording_data, ['isrc-list', 0]) - self.title = recording_data['title'] - - self.lenth = get_elem_from_obj(recording_data, ['length']) - - self.save() - - def __str__(self): - return f"track: \"{self.title}\" {self.isrc or ''}" - - def save(self): - logger.info(f"caching {self}") - - temp_database.add_track( - musicbrainz_releasetrackid=self.musicbrainz_releasetrackid, - musicbrainz_albumid=self.release.musicbrainz_albumid, - feature_aritsts=[artist.musicbrainz_artistid for artist in self.artists], - tracknumber=self.track_number, - track=self.title, - isrc=self.isrc, - length=int(self.lenth) - ) - - def append_artist(self, artist_id: str): - if artist_id is None: - return - - for existing_artist in self.artists: - if artist_id == existing_artist.musicbrainz_artistid: - return existing_artist - new_artist = MetadataDownloader.Artist(artist_id, new_release_groups=False) - self.artists.append(new_artist) - return new_artist - - def download(self, option: dict): - type_ = option['type'] - mb_id = option['id'] - - if type_ == "artist": - return self.Artist(mb_id) - if type_ == "release_group": - return self.ReleaseGroup(mb_id) - if type_ == "release": - return self.Release(mb_id) - if type_ == "recording": - return self.Track(mb_id) - - logger.error(f"download type {type_} doesn't exists :(") - - - -if __name__ == "__main__": - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[ - logging.FileHandler(os.path.join(TEMP_DIR, LOG_FILE)), - logging.StreamHandler() - ] - ) - - downloader = MetadataDownloader() - - downloader.download({'id': 'd2006339-9e98-4624-a386-d503328eb854', 'type': 'recording'}) - downloader.download({'id': 'cdd16860-35fd-46af-bd8c-5de7b15ebc31', 'type': 'release'}) - # download({'id': '4b9af532-ef7e-42ab-8b26-c466327cb5e0', 'type': 'release'}) - #download({'id': 'c24ed9e7-6df9-44de-8570-975f1a5a75d1', 'type': 'track'}) diff --git a/src/music_kraken/not_used_anymore/metadata/metadata_search.py b/src/music_kraken/not_used_anymore/metadata/metadata_search.py deleted file mode 100644 index bae25e4..0000000 --- a/src/music_kraken/not_used_anymore/metadata/metadata_search.py +++ /dev/null @@ -1,364 +0,0 @@ -from typing import List -import musicbrainzngs - -from src.music_kraken.utils.shared import * -from src.music_kraken.utils.object_handeling import get_elem_from_obj, parse_music_brainz_date - -logger = SEARCH_LOGGER - -MAX_PARAMETERS = 3 -OPTION_TYPES = ['artist', 'release_group', 'release', 'recording'] - - -class Option: - def __init__(self, type_: str, id_: str, name: str, additional_info: str = "") -> None: - # print(type_, id_, name) - if type_ not in OPTION_TYPES: - raise ValueError(f"type: {type_} doesn't exist. Legal Values: {OPTION_TYPES}") - self.type = type_ - self.name = name - self.id = id_ - - self.additional_info = additional_info - - def __getitem__(self, item): - map_ = { - "id": self.id, - "type": self.type, - "kind": self.type, - "name": self.name - } - return map_[item] - - def __repr__(self) -> str: - type_repr = { - 'artist': 'artist\t\t', - 'release_group': 'release group\t', - 'release': 'release\t\t', - 'recording': 'recording\t' - } - return f"{type_repr[self.type]}: \"{self.name}\"{self.additional_info}" - - -class MultipleOptions: - def __init__(self, option_list: List[Option]) -> None: - self.option_list = option_list - - def __repr__(self) -> str: - return "\n".join([f"{str(i).zfill(2)}) {choice.__repr__()}" for i, choice in enumerate(self.option_list)]) - - -class Search: - def __init__(self) -> None: - self.options_history = [] - self.current_option: Option - - def append_new_choices(self, new_choices: List[Option]) -> MultipleOptions: - self.options_history.append(new_choices) - return MultipleOptions(new_choices) - - def get_previous_options(self): - self.options_history.pop(-1) - return MultipleOptions(self.options_history[-1]) - - @staticmethod - def fetch_new_options_from_artist(artist: Option): - """ - returning list of artist and every release group - """ - result = musicbrainzngs.get_artist_by_id(artist.id, includes=["release-groups", "releases"]) - artist_data = get_elem_from_obj(result, ['artist'], return_if_none={}) - - result = [artist] - - # sort all release groups by date and add album sort to have them in chronological order. - release_group_list = artist_data['release-group-list'] - for i, release_group in enumerate(release_group_list): - release_group_list[i]['first-release-date'] = parse_music_brainz_date(release_group['first-release-date']) - release_group_list.sort(key=lambda x: x['first-release-date']) - release_group_list = [Option("release_group", get_elem_from_obj(release_group_, ['id']), - get_elem_from_obj(release_group_, ['title']), - additional_info=f" ({get_elem_from_obj(release_group_, ['type'])}) from {get_elem_from_obj(release_group_, ['first-release-date'])}") - for release_group_ in release_group_list] - - result.extend(release_group_list) - return result - - @staticmethod - def fetch_new_options_from_release_group(release_group: Option): - """ - returning list including the artists, the releases and the tracklist of the first release - """ - results = [] - - result = musicbrainzngs.get_release_group_by_id(release_group.id, - includes=["artist-credits", "releases"]) - release_group_data = get_elem_from_obj(result, ['release-group'], return_if_none={}) - artist_datas = get_elem_from_obj(release_group_data, ['artist-credit'], return_if_none={}) - release_datas = get_elem_from_obj(release_group_data, ['release-list'], return_if_none={}) - - # appending all the artists to results - for artist_data in artist_datas: - results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']), - get_elem_from_obj(artist_data, ['artist', 'name']))) - - # appending initial release group - results.append(release_group) - - # appending all releases - first_release = None - for i, release_data in enumerate(release_datas): - results.append( - Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']), - additional_info=f" ({get_elem_from_obj(release_data, ['status'])})")) - if i == 0: - first_release = results[-1] - - # append tracklist of first release - if first_release is not None: - results.extend(Search.fetch_new_options_from_release(first_release, only_tracklist=True)) - - return results - - @staticmethod - def fetch_new_options_from_release(release: Option, only_tracklist: bool = False): - """ - artists - release group - release - tracklist - """ - results = [] - result = musicbrainzngs.get_release_by_id(release.id, - includes=["recordings", "labels", "release-groups", "artist-credits"]) - release_data = get_elem_from_obj(result, ['release'], return_if_none={}) - label_data = get_elem_from_obj(release_data, ['label-info-list'], return_if_none={}) - recording_datas = get_elem_from_obj(release_data, ['medium-list', 0, 'track-list'], return_if_none=[]) - release_group_data = get_elem_from_obj(release_data, ['release-group'], return_if_none={}) - artist_datas = get_elem_from_obj(release_data, ['artist-credit'], return_if_none={}) - - # appending all the artists to results - for artist_data in artist_datas: - results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']), - get_elem_from_obj(artist_data, ['artist', 'name']))) - - # appending the according release group - results.append(Option("release_group", get_elem_from_obj(release_group_data, ['id']), - get_elem_from_obj(release_group_data, ['title']), - additional_info=f" ({get_elem_from_obj(release_group_data, ['type'])}) from {get_elem_from_obj(release_group_data, ['first-release-date'])}")) - - # appending the release - results.append(release) - - # appending the tracklist, but first putting it in a list, in case of only_tracklist being True to - # return this instead - tracklist = [] - for i, recording_data in enumerate(recording_datas): - recording_data = recording_data['recording'] - tracklist.append(Option('recording', get_elem_from_obj(recording_data, ['id']), - get_elem_from_obj(recording_data, ['title']), - f" ({get_elem_from_obj(recording_data, ['length'])}) from {get_elem_from_obj(recording_data, ['artist-credit-phrase'])}")) - - if only_tracklist: - return tracklist - results.extend(tracklist) - return results - - @staticmethod - def fetch_new_options_from_record(recording: Option): - """ - artists, release, record - """ - results = [] - - result = musicbrainzngs.get_recording_by_id(recording.id, includes=["artists", "releases"]) - recording_data = result['recording'] - release_datas = get_elem_from_obj(recording_data, ['release-list']) - artist_datas = get_elem_from_obj(recording_data, ['artist-credit'], return_if_none={}) - - # appending all the artists to results - for artist_data in artist_datas: - results.append(Option('artist', get_elem_from_obj(artist_data, ['artist', 'id']), - get_elem_from_obj(artist_data, ['artist', 'name']))) - - # appending all releases - for i, release_data in enumerate(release_datas): - results.append( - Option('release', get_elem_from_obj(release_data, ['id']), get_elem_from_obj(release_data, ['title']), - additional_info=f" ({get_elem_from_obj(release_data, ['status'])})")) - - results.append(recording) - - return results - - def fetch_new_options(self) -> MultipleOptions: - if self.current_option is None: - return -1 - - result = [] - if self.current_option.type == 'artist': - result = self.fetch_new_options_from_artist(self.current_option) - elif self.current_option.type == 'release_group': - result = self.fetch_new_options_from_release_group(self.current_option) - elif self.current_option.type == 'release': - result = self.fetch_new_options_from_release(self.current_option) - elif self.current_option.type == 'recording': - result = self.fetch_new_options_from_record(self.current_option) - - return self.append_new_choices(result) - - def choose(self, index: int) -> MultipleOptions: - if len(self.options_history) == 0: - logging.error("initial query neaded before choosing") - return MultipleOptions([]) - - latest_options = self.options_history[-1] - if index >= len(latest_options): - logging.error("index outside of options") - return MultipleOptions([]) - - self.current_option = latest_options[index] - return self.fetch_new_options() - - @staticmethod - def search_recording_from_text(artist: str = None, release_group: str = None, recording: str = None, - query: str = None): - result = musicbrainzngs.search_recordings(artist=artist, release=release_group, recording=recording, - query=query) - recording_list = get_elem_from_obj(result, ['recording-list'], return_if_none=[]) - - resulting_options = [ - Option("recording", get_elem_from_obj(recording_, ['id']), get_elem_from_obj(recording_, ['title']), - additional_info=f" of {get_elem_from_obj(recording_, ['release-list', 0, 'title'])} by {get_elem_from_obj(recording_, ['artist-credit', 0, 'name'])}") - for recording_ in recording_list] - return resulting_options - - @staticmethod - def search_release_group_from_text(artist: str = None, release_group: str = None, query: str = None): - result = musicbrainzngs.search_release_groups(artist=artist, releasegroup=release_group, query=query) - release_group_list = get_elem_from_obj(result, ['release-group-list'], return_if_none=[]) - - resulting_options = [Option("release_group", get_elem_from_obj(release_group_, ['id']), - get_elem_from_obj(release_group_, ['title']), - additional_info=f" by {get_elem_from_obj(release_group_, ['artist-credit', 0, 'name'])}") - for release_group_ in release_group_list] - return resulting_options - - @staticmethod - def search_artist_from_text(artist: str = None, query: str = None): - result = musicbrainzngs.search_artists(artist=artist, query=query) - artist_list = get_elem_from_obj(result, ['artist-list'], return_if_none=[]) - - resulting_options = [Option("artist", get_elem_from_obj(artist_, ['id']), get_elem_from_obj(artist_, ['name']), - additional_info=f": {', '.join([i['name'] for i in get_elem_from_obj(artist_, ['tag-list'], return_if_none=[])])}") - for artist_ in artist_list] - return resulting_options - - def search_from_text(self, artist: str = None, release_group: str = None, recording: str = None) -> MultipleOptions: - logger.info( - f"searching specified artist: \"{artist}\", release group: \"{release_group}\", recording: \"{recording}\"") - if artist is None and release_group is None and recording is None: - logger.error("either artist, release group or recording has to be set") - return MultipleOptions([]) - - if recording is not None: - logger.info("search for recording") - results = self.search_recording_from_text(artist=artist, release_group=release_group, recording=recording) - elif release_group is not None: - logger.info("search for release group") - results = self.search_release_group_from_text(artist=artist, release_group=release_group) - else: - logger.info("search for artist") - results = self.search_artist_from_text(artist=artist) - - return self.append_new_choices(results) - - def search_from_text_unspecified(self, query: str) -> MultipleOptions: - logger.info(f"searching unspecified: \"{query}\"") - - results = [] - results.extend(self.search_artist_from_text(query=query)) - results.extend(self.search_release_group_from_text(query=query)) - results.extend(self.search_recording_from_text(query=query)) - - return self.append_new_choices(results) - - def search_from_query(self, query: str) -> MultipleOptions: - if query is None: - return MultipleOptions([]) - """ - mit # wird ein neuer Parameter gestartet - der Buchstabe dahinter legt die Art des Parameters fest - "#a Psychonaut 4 #r Tired, Numb and #t Drop by Drop" - if no # is in the query it gets treated as "unspecified query" - :param query: - :return: - """ - - if not '#' in query: - return self.search_from_text_unspecified(query) - - artist = None - release_group = None - recording = None - - query = query.strip() - parameters = query.split('#') - parameters.remove('') - - if len(parameters) > MAX_PARAMETERS: - raise ValueError(f"too many parameters. Only {MAX_PARAMETERS} are allowed") - - for parameter in parameters: - splitted = parameter.split(" ") - type_ = splitted[0] - input_ = " ".join(splitted[1:]).strip() - - if type_ == "a": - artist = input_ - continue - if type_ == "r": - release_group = input_ - continue - if type_ == "t": - recording = input_ - continue - - return self.search_from_text(artist=artist, release_group=release_group, recording=recording) - - -def automated_demo(): - search = Search() - search.search_from_text(artist="I Prevail") - - # choose an artist - search.choose(0) - # choose a release group - search.choose(9) - # choose a release - search.choose(2) - # choose a recording - search.choose(4) - - -def interactive_demo(): - search = Search() - while True: - input_ = input( - "q to quit, .. for previous options, int for this element, str to search for query, ok to download: ") - input_.strip() - if input_.lower() == "ok": - break - if input_.lower() == "q": - break - if input_.lower() == "..": - search.get_previous_options() - continue - if input_.isdigit(): - search.choose(int(input_)) - continue - search.search_from_query(input_) - - -if __name__ == "__main__": - interactive_demo() diff --git a/src/music_kraken/not_used_anymore/metadata/sources/__init__.py b/src/music_kraken/not_used_anymore/metadata/sources/__init__.py deleted file mode 100644 index e9dba5f..0000000 --- a/src/music_kraken/not_used_anymore/metadata/sources/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from enum import Enum - -class Providers(Enum): - musicbrainz = "musicbrainz" diff --git a/src/music_kraken/not_used_anymore/metadata/sources/musicbrainz.py b/src/music_kraken/not_used_anymore/metadata/sources/musicbrainz.py deleted file mode 100644 index b329bcc..0000000 --- a/src/music_kraken/not_used_anymore/metadata/sources/musicbrainz.py +++ /dev/null @@ -1,59 +0,0 @@ -from typing import List -import musicbrainzngs - -from src.music_kraken.database import ( - Artist, - Album, - Song -) -from src.music_kraken.utils.object_handeling import ( - get_elem_from_obj -) - - -def get_artist(flat: bool = False) -> Artist: - # getting the flat artist - artist_object = Artist() - if flat: - return artist_object - # get additional stuff like discography - return artist_object - - -def get_album(flat: bool = False) -> Album: - # getting the flat album object - album_object = Album() - if flat: - return album_object - # get additional stuff like tracklist - return album_object - - -def get_song(mb_id: str, flat: bool = False) -> Song: - # getting the flat song object - try: - result = musicbrainzngs.get_recording_by_id(mb_id, - includes=["artists", "releases", "recording-rels", "isrcs", - "work-level-rels"]) - except musicbrainzngs.musicbrainz.NetworkError: - return - - recording_data = result['recording'] - - song_object = Song( - mb_id=mb_id, - title=recording_data['title'], - length=get_elem_from_obj(recording_data, ['length']), - isrc=get_elem_from_obj(recording_data, ['isrc-list', 0]) - ) - if flat: - return song_object - - # fetch additional stuff - artist_data_list = get_elem_from_obj(recording_data, ['artist-credit'], return_if_none=[]) - for artist_data in artist_data_list: - mb_artist_id = get_elem_from_obj(artist_data, ['artist', 'id']) - - release_data = get_elem_from_obj(recording_data, ['release-list', -1]) - mb_release_id = get_elem_from_obj(release_data, ['id']) - return song_object diff --git a/src/music_kraken/not_used_anymore/sources/__init__.py b/src/music_kraken/not_used_anymore/sources/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/music_kraken/not_used_anymore/sources/genius.py b/src/music_kraken/not_used_anymore/sources/genius.py deleted file mode 100644 index 798b30c..0000000 --- a/src/music_kraken/not_used_anymore/sources/genius.py +++ /dev/null @@ -1,172 +0,0 @@ -import requests -from typing import List -from bs4 import BeautifulSoup -import pycountry - -from src.music_kraken.database import ( - Lyrics, - Song, - Artist -) -from src.music_kraken.utils.shared import * -from src.music_kraken.utils import phonetic_compares -from src.music_kraken.utils.object_handeling import get_elem_from_obj - -TIMEOUT = 10 - -# search doesn't support isrc -# https://genius.com/api/search/multi?q=I Prevail - Breaking Down -# https://genius.com/api/songs/6192944 -# https://docs.genius.com/ - -session = requests.Session() -session.headers = { - "Connection": "keep-alive", - "Referer": "https://genius.com/search/embed" -} -session.proxies = proxies - -logger = GENIUS_LOGGER - - -class LyricsSong: - def __init__(self, raw_data: dict, desirered_data: dict): - self.raw_data = raw_data - self.desired_data = desirered_data - - song_data = get_elem_from_obj(self.raw_data, ['result'], return_if_none={}) - self.id = get_elem_from_obj(song_data, ['id']) - self.artist = get_elem_from_obj(song_data, ['primary_artist', 'name']) - self.title = get_elem_from_obj(song_data, ['title']) - - lang_code = get_elem_from_obj(song_data, ['language']) or "en" - self.language = pycountry.languages.get(alpha_2=lang_code) - self.lang = self.language.alpha_3 - self.url = get_elem_from_obj(song_data, ['url']) - - # maybe could be implemented - self.lyricist: str - - if get_elem_from_obj(song_data, ['lyrics_state']) != "complete": - logger.warning( - f"lyrics state of {self.title} by {self.artist} is not complete but {get_elem_from_obj(song_data, ['lyrics_state'])}") - - self.valid = self.is_valid() - if self.valid: - logger.info(f"found lyrics for \"{self.__repr__()}\"") - else: - return - - self.lyrics = self.fetch_lyrics() - if self.lyrics is None: - self.valid = False - - def is_valid(self) -> bool: - title_match, title_distance = phonetic_compares.match_titles(self.title, self.desired_data['track']) - artist_match, artist_distance = phonetic_compares.match_artists(self.desired_data['artist'], self.artist) - - return not title_match and not artist_match - - def __repr__(self) -> str: - return f"{self.title} by {self.artist} ({self.url})" - - def fetch_lyrics(self) -> str | None: - if not self.valid: - logger.warning(f"{self.__repr__()} is invalid but the lyrics still get fetched. Something could be wrong.") - - try: - r = session.get(self.url, timeout=TIMEOUT) - except requests.exceptions.Timeout: - logger.warning(f"{self.url} timed out after {TIMEOUT} seconds") - return None - if r.status_code != 200: - logger.warning(f"{r.url} returned {r.status_code}:\n{r.content}") - return None - - soup = BeautifulSoup(r.content, "html.parser") - lyrics_soups = soup.find_all('div', {'data-lyrics-container': "true"}) - if len(lyrics_soups) == 0: - logger.warning(f"didn't found lyrics on {self.url}") - return None - # if len(lyrics_soups) != 1: - # logger.warning(f"number of lyrics_soups doesn't equals 1, but {len(lyrics_soups)} on {self.url}") - - lyrics = "\n".join([lyrics_soup.getText(separator="\n", strip=True) for lyrics_soup in lyrics_soups]) - - #
With the soundle - self.lyrics = lyrics - return lyrics - - def get_lyrics_object(self) -> Lyrics | None: - if self.lyrics is None: - return None - return Lyrics(text=self.lyrics, language=self.lang or "en") - - lyrics_object = property(fget=get_lyrics_object) - - -def process_multiple_songs(song_datas: list, desired_data: dict) -> List[LyricsSong]: - all_songs = [LyricsSong(song_data, desired_data) for song_data in song_datas] - return all_songs - - -def search_song_list(artist: str, track: str) -> List[LyricsSong]: - endpoint = "https://genius.com/api/search/multi?q=" - url = f"{endpoint}{artist} - {track}" - logging.info(f"requesting {url}") - - desired_data = { - 'artist': artist, - 'track': track - } - - try: - r = session.get(url, timeout=TIMEOUT) - except requests.exceptions.Timeout: - logger.warning(f"{url} timed out after {TIMEOUT} seconds") - return [] - if r.status_code != 200: - logging.warning(f"{r.url} returned {r.status_code}:\n{r.content}") - return [] - content = r.json() - if get_elem_from_obj(content, ['meta', 'status']) != 200: - logging.warning(f"{r.url} returned {get_elem_from_obj(content, ['meta', 'status'])}:\n{content}") - return [] - - sections = get_elem_from_obj(content, ['response', 'sections']) - for section in sections: - section_type = get_elem_from_obj(section, ['type']) - if section_type == "song": - return process_multiple_songs(get_elem_from_obj(section, ['hits'], return_if_none=[]), desired_data) - - return [] - - -def fetch_lyrics_from_artist(song: Song, artist: Artist) -> List[Lyrics]: - lyrics_list: List[Lyrics] = [] - lyrics_song_list = search_song_list(artist.name, song.title) - - for lyrics_song in lyrics_song_list: - if lyrics_song.valid: - lyrics_list.append(lyrics_song.lyrics_object) - - return lyrics_list - - -def fetch_lyrics(song: Song) -> List[Lyrics]: - lyrics: List[Lyrics] = [] - - for artist in song.artists: - lyrics.extend(fetch_lyrics_from_artist(song, artist)) - - return lyrics - - -""" -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) - - songs = search("Zombiez", "WALL OF Z") - for song in songs: - print(song) -""" diff --git a/src/music_kraken/not_used_anymore/sources/local_files.py b/src/music_kraken/not_used_anymore/sources/local_files.py deleted file mode 100644 index 358d413..0000000 --- a/src/music_kraken/not_used_anymore/sources/local_files.py +++ /dev/null @@ -1,57 +0,0 @@ -import os - -from ...utils.shared import * -from ...utils import phonetic_compares - - -def is_valid(a1, a2, t1, t2) -> bool: - title_match, title_distance = phonetic_compares.match_titles(t1, t2) - artist_match, artist_distance = phonetic_compares.match_artists(a1, a2) - - return not title_match and not artist_match - - -def get_metadata(file): - artist = None - title = None - - audiofile = EasyID3(file) - artist = audiofile['artist'] - title = audiofile['title'] - - return artist, title - - -def check_for_song(folder, artists, title): - if not os.path.exists(folder): - return False - files = [os.path.join(folder, i) for i in os.listdir(folder)] - - for file in files: - artists_, title_ = get_metadata(file) - if is_valid(artists, artists_, title, title_): - return True - return False - - -def get_path(row): - title = row['title'] - artists = row['artists'] - path_ = os.path.join(MUSIC_DIR, row['path']) - - print(artists, title, path_) - check_for_song(path_, artists, title) - - return None - - -if __name__ == "__main__": - row = {'artists': ['Psychonaut 4'], 'id': '6b40186b-6678-4328-a4b8-eb7c9806a9fb', 'tracknumber': None, - 'titlesort ': None, 'musicbrainz_releasetrackid': '6b40186b-6678-4328-a4b8-eb7c9806a9fb', - 'musicbrainz_albumid': '0d229a02-74f6-4c77-8c20-6612295870ae', 'title': 'Sweet Decadance', 'isrc': None, - 'album': 'Neurasthenia', 'copyright': 'Talheim Records', 'album_status': 'Official', 'language': 'eng', - 'year': '2016', 'date': '2016-10-07', 'country': 'AT', 'barcode': None, 'albumartist': 'Psychonaut 4', - 'albumsort': None, 'musicbrainz_albumtype': 'Album', 'compilation': None, - 'album_artist_id': 'c0c720b5-012f-4204-a472-981403f37b12', 'path': 'dsbm/Psychonaut 4/Neurasthenia', - 'file': 'dsbm/Psychonaut 4/Neurasthenia/Sweet Decadance.mp3', 'genre': 'dsbm', 'url': None, 'src': None} - print(get_path(row)) diff --git a/src/music_kraken/not_used_anymore/sources/musify.py b/src/music_kraken/not_used_anymore/sources/musify.py deleted file mode 100644 index bc8851b..0000000 --- a/src/music_kraken/not_used_anymore/sources/musify.py +++ /dev/null @@ -1,181 +0,0 @@ -import time - -import requests -import bs4 - -from ...utils.shared import * -from ...utils import phonetic_compares - -from .source import AudioSource -from ...database import song as song_objects - - -TRIES = 5 -TIMEOUT = 10 - -logger = MUSIFY_LOGGER - -session = requests.Session() -session.headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0", - "Connection": "keep-alive", - "Referer": "https://musify.club/" -} -session.proxies = proxies - - -class Musify(AudioSource): - @classmethod - def fetch_source(cls, song: dict) -> str | None: - super().fetch_source(song) - - title = song.title - artists = song.get_artist_names() - - # trying to get a download link via the autocomplete api - for artist in artists: - url = cls.fetch_source_from_autocomplete(title=title, artist=artist) - if url is not None: - logger.info(f"found download link {url}") - return url - - # trying to get a download link via the html of the direct search page - for artist in artists: - url = cls.fetch_source_from_search(title=title, artist=artist) - if url is not None: - logger.info(f"found download link {url}") - return url - - logger.warning(f"Didn't find the audio on {cls.__name__}") - - @classmethod - def get_download_link(cls, track_url: str) -> str | None: - # https://musify.club/track/dl/18567672/rauw-alejandro-te-felicito-feat-shakira.mp3 - # /track/sundenklang-wenn-mein-herz-schreit-3883217' - - file_ = track_url.split("/")[-1] - if len(file_) == 0: - return None - musify_id = file_.split("-")[-1] - musify_name = "-".join(file_.split("-")[:-1]) - - return f"https://musify.club/track/dl/{musify_id}/{musify_name}.mp3" - - @classmethod - def fetch_source_from_autocomplete(cls, title: str, artist: str) -> str | None: - url = f"https://musify.club/search/suggestions?term={artist} - {title}" - - try: - logger.info(f"calling {url}") - r = session.get(url=url) - except requests.exceptions.ConnectionError: - logger.info("connection error occurred") - return None - if r.status_code == 200: - autocomplete = r.json() - for song in autocomplete: - if artist in song['label'] and "/track" in song['url']: - return cls.get_download_link(song['url']) - - return None - - @classmethod - def get_soup_of_search(cls, query: str, trie=0) -> bs4.BeautifulSoup | None: - url = f"https://musify.club/search?searchText={query}" - logger.debug(f"Trying to get soup from {url}") - try: - r = session.get(url, timeout=15) - except requests.exceptions.Timeout: - return None - if r.status_code != 200: - if r.status_code in [503] and trie < TRIES: - logging.warning(f"youtube blocked downloading. ({trie}-{TRIES})") - logging.warning(f"retrying in {TIMEOUT} seconds again") - time.sleep(TIMEOUT) - return cls.get_soup_of_search(query, trie=trie + 1) - - logging.warning("too many tries, returning") - return None - return bs4.BeautifulSoup(r.content, features="html.parser") - - @classmethod - def fetch_source_from_search(cls, title: str, artist: str) -> str | None: - query: str = f"{artist[0]} - {title}" - search_soup = cls.get_soup_of_search(query=query) - if search_soup is None: - return None - - # get the soup of the container with all track results - tracklist_container_soup = search_soup.find_all("div", {"class": "playlist"}) - if len(tracklist_container_soup) == 0: - return None - if len(tracklist_container_soup) != 1: - logger.warning("HTML Layout of https://musify.club changed. (or bug)") - tracklist_container_soup = tracklist_container_soup[0] - - tracklist_soup = tracklist_container_soup.find_all("div", {"class": "playlist__details"}) - - def parse_track_soup(_track_soup): - anchor_soups = _track_soup.find_all("a") - artist_ = anchor_soups[0].text.strip() - track_ = anchor_soups[1].text.strip() - url_ = anchor_soups[1]['href'] - return artist_, track_, url_ - - # check each track in the container, if they match - for track_soup in tracklist_soup: - artist_option, title_option, track_url = parse_track_soup(track_soup) - - title_match, title_distance = phonetic_compares.match_titles(title, title_option) - artist_match, artist_distance = phonetic_compares.match_artists(artist, artist_option) - - logging.debug(f"{(title, title_option, title_match, title_distance)}") - logging.debug(f"{(artist, artist_option, artist_match, artist_distance)}") - - if not title_match and not artist_match: - return cls.get_download_link(track_url) - - return None - - @classmethod - def download_from_musify(cls, target: song_objects.Target, url): - # returns if target hasn't been set - if target.path is None or target.file is None: - logger.warning(f"target hasn't been set. Can't download. Most likely a bug.") - return False - - # download the audio data - logger.info(f"downloading: '{url}'") - try: - r = session.get(url, timeout=TIMEOUT) - except requests.exceptions.ConnectionError: - return False - except requests.exceptions.ReadTimeout: - logger.warning(f"musify server didn't respond after {TIMEOUT} seconds") - return False - if r.status_code != 200: - if r.status_code == 404: - logger.warning(f"{r.url} was not found") - return False - if r.status_code == 503: - logger.warning(f"{r.url} raised an internal server error") - return False - logger.error(f"\"{url}\" returned {r.status_code}: {r.text}") - return False - - # write to the file and create folder if it doesn't exist - if not os.path.exists(target.path): - os.makedirs(target.path, exist_ok=True) - with open(target.file, "wb") as mp3_file: - mp3_file.write(r.content) - logger.info("finished") - return True - - @classmethod - def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source): - super().fetch_audio(song, src) - return cls.download_from_musify(song.target, src.url) - - -if __name__ == "__main__": - pass diff --git a/src/music_kraken/not_used_anymore/sources/source.py b/src/music_kraken/not_used_anymore/sources/source.py deleted file mode 100644 index 9883866..0000000 --- a/src/music_kraken/not_used_anymore/sources/source.py +++ /dev/null @@ -1,23 +0,0 @@ -from ...utils.shared import * -from typing import Tuple - -from ...database import song as song_objects - - -logger = URL_DOWNLOAD_LOGGER - -""" -The class "Source" is the superclass every class for specific audio -sources inherits from. This gives the advantage of a consistent -calling of the functions do search for a song and to download it. -""" - - -class AudioSource: - @classmethod - def fetch_source(cls, row: dict): - logger.info(f"try getting source {row.title} from {cls.__name__}") - - @classmethod - def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source): - logger.info(f"downloading {song}: {cls.__name__} {src.url} -> {song.target.file}") diff --git a/src/music_kraken/not_used_anymore/sources/youtube.py b/src/music_kraken/not_used_anymore/sources/youtube.py deleted file mode 100644 index f0051d3..0000000 --- a/src/music_kraken/not_used_anymore/sources/youtube.py +++ /dev/null @@ -1,98 +0,0 @@ -from typing import List - -import youtube_dl -import time - -from ...utils.shared import * -from ...utils import phonetic_compares -from .source import AudioSource - -from ...database import song as song_objects - - -logger = YOUTUBE_LOGGER - -YDL_OPTIONS = {'format': 'bestaudio', 'noplaylist': 'True'} -YOUTUBE_URL_KEY = 'webpage_url' -YOUTUBE_TITLE_KEY = 'title' -WAIT_BETWEEN_BLOCK = 10 -MAX_TRIES = 3 - -def youtube_length_to_mp3_length(youtube_len: float) -> int: - return int(youtube_len * 1000) - - -class Youtube(AudioSource): - @classmethod - def get_youtube_from_isrc(cls, isrc: str) -> List[dict]: - # https://stackoverflow.com/questions/63388364/searching-youtube-videos-using-youtube-dl - with youtube_dl.YoutubeDL(YDL_OPTIONS) as ydl: - try: - videos = ydl.extract_info(f"ytsearch:{isrc}", download=False)['entries'] - except youtube_dl.utils.DownloadError: - return [] - - return [{ - 'url': video[YOUTUBE_URL_KEY], - 'title': video[YOUTUBE_TITLE_KEY], - 'length': youtube_length_to_mp3_length(float(videos[0]['duration'])) - } for video in videos] - - @classmethod - def fetch_source(cls, song: song_objects.Song): - # https://stackoverflow.com/questions/63388364/searching-youtube-videos-using-youtube-dl - super().fetch_source(song) - - if not song.has_isrc(): - return None - - real_title = song.title.lower() - - final_result = None - results = cls.get_youtube_from_isrc(song.isrc) - for result in results: - video_title = result['title'].lower() - match, distance = phonetic_compares.match_titles(video_title, real_title) - - if match: - continue - - if not phonetic_compares.match_length(song.length, result['length']): - logger.warning(f"{song.length} doesn't match with {result}") - continue - - final_result = result - - if final_result is None: - return None - logger.info(f"found video {final_result}") - return final_result['url'] - - @classmethod - def fetch_audio(cls, song: song_objects.Song, src: song_objects.Source, trie: int=0): - super().fetch_audio(song, src) - if song.target.file is None or song.target.path is None: - logger.warning(f"target hasn't been set. Can't download. Most likely a bug.") - return False - - options = { - 'format': 'bestaudio/best', - 'keepvideo': False, - 'outtmpl': song.target.file - } - - # downloading - try: - with youtube_dl.YoutubeDL(options) as ydl: - ydl.download([src.url]) - - except youtube_dl.utils.DownloadError: - # retry when failing - logger.warning(f"youtube blocked downloading. ({trie}-{MAX_TRIES})") - if trie >= MAX_TRIES: - logger.warning("too many tries, returning") - return False - logger.warning(f"retrying in {WAIT_BETWEEN_BLOCK} seconds again") - time.sleep(WAIT_BETWEEN_BLOCK) - return cls.fetch_audio(song, src, trie=trie + 1) - diff --git a/src/music_kraken/pages/__init__.py b/src/music_kraken/pages/__init__.py index a7e2a61..b562686 100644 --- a/src/music_kraken/pages/__init__.py +++ b/src/music_kraken/pages/__init__.py @@ -1,3 +1,5 @@ from .encyclopaedia_metallum import EncyclopaediaMetallum from .musify import Musify +from .youtube import YouTube + from .abstract import Page, INDEPENDENT_DB_OBJECTS diff --git a/src/music_kraken/pages/musify.py b/src/music_kraken/pages/musify.py index 46c8429..ed045b6 100644 --- a/src/music_kraken/pages/musify.py +++ b/src/music_kraken/pages/musify.py @@ -5,7 +5,6 @@ from typing import List, Optional, Type, Union from urllib.parse import urlparse import pycountry -import requests from bs4 import BeautifulSoup from ..connection import Connection @@ -20,7 +19,6 @@ from ..objects import ( ID3Timestamp, FormattedText, Label, - Options, Target, DatabaseObject ) diff --git a/src/music_kraken/pages/preset.py b/src/music_kraken/pages/preset.py index 7fc8212..5e940ba 100644 --- a/src/music_kraken/pages/preset.py +++ b/src/music_kraken/pages/preset.py @@ -12,8 +12,10 @@ from ..objects import ( Song, Album, Label, + Target ) from ..connection import Connection +from ..utils.support_classes import DownloadResult class Preset(Page): # CHANGE @@ -57,3 +59,6 @@ class Preset(Page): def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: return Label() + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: + return DownloadResult() diff --git a/src/music_kraken/pages/youtube.py b/src/music_kraken/pages/youtube.py index a5d6736..b23b1f8 100644 --- a/src/music_kraken/pages/youtube.py +++ b/src/music_kraken/pages/youtube.py @@ -1,46 +1,72 @@ -from typing import List -import requests -from bs4 import BeautifulSoup -import pycountry - -from ..utils.shared import ( - ENCYCLOPAEDIA_METALLUM_LOGGER as LOGGER -) +from typing import List, Optional, Type +from urllib.parse import urlparse +import logging +from ..objects import Source, DatabaseObject from .abstract import Page -from ..database import ( - MusicObject, +from ..objects import ( Artist, Source, SourcePages, Song, Album, - ID3Timestamp, - FormattedText -) -from ..utils import ( - string_processing + Label, + Target ) +from ..connection import Connection +from ..utils.support_classes import DownloadResult +from ..utils.shared import YOUTUBE_LOGGER -INVIDIOUS_INSTANCE = "https://yewtu.be/feed/popular" -class Youtube(Page): - """ - The youtube downloader should use https://invidious.io/ - to make the request. - They are an alternative frontend. +""" +- https://y.com.sb/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance +- https://y.com.sb/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA +- https://y.com.sb/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM +""" - To find an artist filter for chanel and search for - `{artist.name} - Topic` - and then ofc check for viable results. - - Ofc you can also implement searching songs by isrc. - - NOTE: I didn't look at the invidious api yet. If it sucks, - feel free to use projects like youtube-dl. - But don't implement you're own youtube client. - I don't wanna maintain that shit. - """ - API_SESSION: requests.Session = requests.Session() +class YouTube(Page): + # CHANGE SOURCE_TYPE = SourcePages.YOUTUBE + LOGGER = YOUTUBE_LOGGER + + def __init__(self, *args, **kwargs): + self.connection: Connection = Connection( + host="https://www.preset.cum/", + logger=self.LOGGER + ) + + super().__init__(*args, **kwargs) + + def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: + return super().get_source_type(source) + + def general_search(self, search_query: str) -> List[DatabaseObject]: + return [Artist(name="works")] + + def label_search(self, label: Label) -> List[Label]: + return [] + + def artist_search(self, artist: Artist) -> List[Artist]: + return [] + + def album_search(self, album: Album) -> List[Album]: + return [] + + def song_search(self, song: Song) -> List[Song]: + return [] + + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: + return Song() + + def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: + return Album() + + def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: + return Artist() + + def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: + return Label() + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: + return DownloadResult()