diff --git a/src/music_kraken/objects/target.py b/src/music_kraken/objects/target.py index 7fae49b..8c70755 100644 --- a/src/music_kraken/objects/target.py +++ b/src/music_kraken/objects/target.py @@ -98,3 +98,6 @@ class Target(DatabaseObject): except requests.exceptions.Timeout: shared.DOWNLOAD_LOGGER.error("Stream timed out.") return False + + def delete(self): + self.file_path.unlink(missing_ok=True) diff --git a/src/music_kraken/pages/abstract.py b/src/music_kraken/pages/abstract.py index 80be169..31b7dd2 100644 --- a/src/music_kraken/pages/abstract.py +++ b/src/music_kraken/pages/abstract.py @@ -257,32 +257,31 @@ class Page(threading.Thread): fill_naming_objects(music_object) - self._download(music_object, {}, genre, download_all) - - return DownloadResult() + return self._download(music_object, {}, genre, download_all) def _download(self, music_object: DatabaseObject, naming_objects: Dict[Type[DatabaseObject], DatabaseObject], download_all: bool = False) -> list: # Skips all releases, that are defined in shared.ALBUM_TYPE_BLACKLIST, if download_all is False if isinstance(music_object, Album): if not download_all and music_object.album_type in shared.ALBUM_TYPE_BLACKLIST: - return [] + return DownloadResult() + self.fetch_details(music_object=music_object, stop_at_level=2) naming_objects[type(music_object)] = music_object if isinstance(music_object, Song): - return [self._download_song(music_object, naming_objects)] + return self._download_song(music_object, naming_objects) - return_values: List = [] + download_result: DownloadResult = DownloadResult() for collection_name in music_object.DOWNWARDS_COLLECTION_ATTRIBUTES: collection: Collection = getattr(self, collection_name) sub_ordered_music_object: DatabaseObject for sub_ordered_music_object in collection: - return_values.extend(self._download(sub_ordered_music_object, naming_objects.copy(), download_all)) + download_result.merge(self._download(sub_ordered_music_object, naming_objects.copy(), download_all)) - return return_values + return download_result def _download_song(self, song: Song, naming_objects: Dict[Type[DatabaseObject], DatabaseObject]): name_attribute = DEFAULT_VALUES.copy() @@ -309,246 +308,27 @@ class Page(threading.Thread): path=DOWNLOAD_PATH.format(**name_attribute), file=DOWNLOAD_FILE.format(**name_attribute) ) - pass - - @classmethod - def download( - cls, - music_object: Union[Song, Album, Artist, Label], - download_features: bool = True, - default_target: DefaultTarget = None, - genre: str = None, - override_existing: bool = False, - create_target_on_demand: bool = True, - download_all: bool = False, - exclude_album_type: Set[AlbumType] = shared.ALBUM_TYPE_BLACKLIST - ) -> DownloadResult: - """ - - :param genre: The downloader will download to THIS folder (set the value of default_target.genre to genre) - :param music_object: - :param download_features: - :param default_target: - :param override_existing: - :param create_target_on_demand: - :param download_all: - :param exclude_album_type: - :return total downloads, failed_downloads: - """ - if default_target is None: - default_target = DefaultTarget() - - if download_all: - exclude_album_type: Set[AlbumType] = set() - elif exclude_album_type is None: - exclude_album_type = { - AlbumType.COMPILATION_ALBUM, - AlbumType.LIVE_ALBUM, - AlbumType.MIXTAPE - } - - if type(music_object) is Song: - return cls.download_song( - music_object, - override_existing=override_existing, - create_target_on_demand=create_target_on_demand, - genre=genre - ) - if type(music_object) is Album: - return cls.download_album( - music_object, - default_target=default_target, - override_existing=override_existing, - genre=genre - ) - if type(music_object) is Artist: - return cls.download_artist( - music_object, - default_target=default_target, - download_features=download_features, - exclude_album_type=exclude_album_type, - genre=genre - ) - if type(music_object) is Label: - return cls.download_label( - music_object, - download_features=download_features, - default_target=default_target, - exclude_album_type=exclude_album_type, - genre=genre - ) - - return DownloadResult(error_message=f"{type(music_object)} can't be downloaded.") - - @classmethod - def download_label( - cls, - label: Label, - exclude_album_type: Set[AlbumType], - download_features: bool = True, - override_existing: bool = False, - default_target: DefaultTarget = None, - genre: str = None - ) -> DownloadResult: - - default_target = DefaultTarget() if default_target is None else copy(default_target) - default_target.label_object(label) - - r = DownloadResult() - - cls.fetch_details(label) - for artist in label.current_artist_collection: - r.merge(cls.download_artist( - artist, - download_features=download_features, - override_existing=override_existing, - default_target=default_target, - exclude_album_type=exclude_album_type, - genre=genre - )) - - album: Album - for album in label.album_collection: - if album.album_type == AlbumType.OTHER: - cls.fetch_details(album) - - if album.album_type in exclude_album_type: - cls.LOGGER.info(f"Skipping {album.option_string} due to the filter. ({album.album_type})") - continue - - r.merge(cls.download_album( - album, - override_existing=override_existing, - default_target=default_target, - genre=genre - )) - - return r - - @classmethod - def download_artist( - cls, - artist: Artist, - exclude_album_type: Set[AlbumType], - download_features: bool = True, - override_existing: bool = False, - default_target: DefaultTarget = None, - genre: str = None - ) -> DownloadResult: - - default_target = DefaultTarget() if default_target is None else copy(default_target) - default_target.artist_object(artist) - - r = DownloadResult() - - cls.fetch_details(artist) - - album: Album - for album in artist.main_album_collection: - if album.album_type in exclude_album_type: - cls.LOGGER.info(f"Skipping {album.option_string} due to the filter. ({album.album_type})") - continue - - r.merge(cls.download_album( - album, - override_existing=override_existing, - default_target=default_target, - genre=genre - )) - - if download_features: - for song in artist.feature_album.song_collection: - r.merge(cls.download_song( - song, - override_existing=override_existing, - default_target=default_target, - genre=genre - )) - - return r - - @classmethod - def download_album( - cls, - album: Album, - override_existing: bool = False, - default_target: DefaultTarget = None, - genre: str = None - ) -> DownloadResult: - - default_target = DefaultTarget() if default_target is None else copy(default_target) - default_target.album_object(album) - - r = DownloadResult() - - cls.fetch_details(album) - - album.update_tracksort() - - cls.LOGGER.info(f"downloading album: {album.title}") - for song in album.song_collection: - r.merge(cls.download_song( - song, - override_existing=override_existing, - default_target=default_target, - genre=genre - )) - - return r - - @classmethod - def download_song( - cls, - song: Song, - override_existing: bool = False, - create_target_on_demand: bool = True, - default_target: DefaultTarget = None, - genre: str = None - ) -> DownloadResult: - cls.LOGGER.debug(f"Setting genre of {song.option_string} to {genre}") - song.genre = genre - - default_target = DefaultTarget() if default_target is None else copy(default_target) - default_target.song_object(song) - - cls.fetch_details(song) - + if song.target_collection.empty: - if create_target_on_demand and not song.main_artist_collection.empty and not song.album_collection.empty: - song.target_collection.append(default_target.target) - else: - return DownloadResult(error_message=f"No target exists for {song.title}, but create_target_on_demand is False.") - - target: Target - if any(target.exists for target in song.target_collection) and not override_existing: - r = DownloadResult(total=1, fail=0) - - existing_target: Target - for existing_target in song.target_collection: - if existing_target.exists: - r.merge(cls._post_process_targets(song=song, temp_target=existing_target)) - break - - return r - - sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE) + song.target_collection.append(new_target) + + sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) if len(sources) == 0: - return DownloadResult(error_message=f"No source found for {song.title} as {cls.__name__}.") + return DownloadResult(error_message=f"No source found for {song.title} as {self.__class__.__name__}.") temp_target: Target = Target( path=shared.TEMP_DIR, file=str(random.randint(0, 999999)) ) - - r = cls._download_song_to_targets(source=sources[0], target=temp_target, desc=song.title) + + r = self._download_song_to_targets(source=sources[0], target=temp_target, desc=song.title) if not r.is_fatal_error: - r.merge(cls._post_process_targets(song, temp_target)) + r.merge(self._post_process_targets(song, temp_target)) return r - - @classmethod - def _post_process_targets(cls, song: Song, temp_target: Target) -> DownloadResult: + + def _post_process_targets(self, song: Song, temp_target: Target) -> DownloadResult: correct_codec(temp_target) write_metadata_to_target(song.metadata, temp_target) @@ -559,9 +339,10 @@ class Page(threading.Thread): if temp_target is not target: temp_target.copy_content(target) r.add_target(target) + + temp_target.delete() return r - - @classmethod - def _download_song_to_targets(cls, source: Source, target: Target, desc: str = None) -> DownloadResult: + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: return DownloadResult() diff --git a/src/music_kraken/pages/musify.py b/src/music_kraken/pages/musify.py index 4b6b390..e4dea1a 100644 --- a/src/music_kraken/pages/musify.py +++ b/src/music_kraken/pages/musify.py @@ -567,7 +567,7 @@ class Musify(Page): if stop_at_level > 1: song: Song for song in album.song_collection: - sources = song.source_collection.get_sources_from_page(cls.SOURCE_TYPE) + sources = song.source_collection.get_sources_from_page(self.SOURCE_TYPE) for source in sources: song.merge(self.fetch_song(source=source)) @@ -997,61 +997,8 @@ class Musify(Page): def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: return Label() - - -class OldMusify(Page): - API_SESSION: requests.Session = requests.Session() - API_SESSION.headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:106.0) Gecko/20100101 Firefox/106.0", - "Connection": "keep-alive", - "Referer": "https://musify.club/" - } - API_SESSION.proxies = shared.proxies - TIMEOUT = 7 - POST_TIMEOUT = 15 - TRIES = 5 - HOST = "https://musify.club" - - CONNECTION = Connection( - host="https://musify.club/", - logger=MUSIFY_LOGGER - ) - - SOURCE_TYPE = SourcePages.MUSIFY - LOGGER = shared.MUSIFY_LOGGER - - - - - - @classmethod - def get_plaintext_query(cls, query: Query) -> str: - if query.album is None: - return f"{query.artist or '*'} - {query.song or '*'}" - return f"{query.artist or '*'} - {query.album or '*'} - {query.song or '*'}" - - - - - - - - - @classmethod - def _get_type_of_url(cls, url: str) -> Optional[Union[Type[Song], Type[Album], Type[Artist], Type[Label]]]: - url: MusifyUrl = cls.parse_url(url) - - if url.source_type == MusifyTypes.ARTIST: - return Artist - if url.source_type == MusifyTypes.RELEASE: - return Album - if url.source_type == MusifyTypes.SONG: - return Song - return None - - @classmethod - def _download_song_to_targets(cls, source: Source, target: Target, desc: str = None) -> DownloadResult: + def download_song_to_targets(self, source: Source, target: Target, desc: str = None) -> DownloadResult: """ https://musify.club/track/im-in-a-coffin-life-never-was-waste-of-skin-16360302 https://musify.club/track/dl/16360302/im-in-a-coffin-life-never-was-waste-of-skin.mp3 @@ -1059,15 +1006,15 @@ class OldMusify(Page): endpoint = source.audio_url if source.audio_url is None: - url: MusifyUrl = cls.parse_url(source.url) + url: MusifyUrl = parse_url(source.url) if url.source_type != MusifyTypes.SONG: return DownloadResult(error_message=f"The url is not of the type Song: {source.url}") endpoint = f"https://musify.club/track/dl/{url.musify_id}/{url.name_without_id}.mp3" - cls.LOGGER.warning(f"The source has no audio link. Falling back to {endpoint}.") + self.LOGGER.warning(f"The source has no audio link. Falling back to {endpoint}.") - r = cls.CONNECTION.get(endpoint, stream=True, raw_url=True) + r = self.connection.get(endpoint, stream=True, raw_url=True) if r is None: return DownloadResult(error_message=f"couldn't connect to {endpoint}")