diff --git a/src/actual_donwload.py b/src/actual_donwload.py index 08b072d..2c24a28 100644 --- a/src/actual_donwload.py +++ b/src/actual_donwload.py @@ -14,7 +14,8 @@ if __name__ == "__main__": youtube_search = [ "s: #a Zombiez", - "10" + "10", + "8" ] music_kraken.cli(genre="test", command_list=youtube_search) diff --git a/src/music_kraken/connection/connection.py b/src/music_kraken/connection/connection.py index 02eba18..46c22de 100644 --- a/src/music_kraken/connection/connection.py +++ b/src/music_kraken/connection/connection.py @@ -1,3 +1,4 @@ +import time from typing import List, Dict, Callable, Optional, Set from urllib.parse import urlparse, urlunsplit, ParseResult import logging @@ -82,6 +83,7 @@ class Connection: headers: dict, refer_from_origin: bool = True, raw_url: bool = False, + wait_on_403: bool = True, **kwargs ) -> Optional[requests.Response]: if try_count >= self.TRIES: @@ -122,12 +124,15 @@ class Connection: self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} " f"at {url}. ({try_count}-{self.TRIES})") self.LOGGER.debug(r.content) + if wait_on_403: + self.LOGGER.warning(f"Waiting for 5 seconds.") + time.sleep(5) self.rotate() return self._request( request=request, - try_count=try_count, + try_count=try_count+1, accepted_response_code=accepted_response_code, url=url, timeout=timeout, diff --git a/src/music_kraken/objects/metadata.py b/src/music_kraken/objects/metadata.py index 3d98d35..488af12 100644 --- a/src/music_kraken/objects/metadata.py +++ b/src/music_kraken/objects/metadata.py @@ -205,6 +205,19 @@ class ID3Timestamp: time_format = self.get_time_format() return time_format, self.date_obj.strftime(time_format) + @classmethod + def fromtimestamp(cls, utc_timestamp: int): + date_obj = datetime.datetime.fromtimestamp(utc_timestamp) + + return cls( + year=date_obj.year, + month=date_obj.month, + day=date_obj.day, + hour=date_obj.hour, + minute=date_obj.minute, + second=date_obj.second + ) + @classmethod def strptime(cls, time_stamp: str, format: str): """ diff --git a/src/music_kraken/pages/youtube.py b/src/music_kraken/pages/youtube.py index f166b4c..94d29ba 100644 --- a/src/music_kraken/pages/youtube.py +++ b/src/music_kraken/pages/youtube.py @@ -1,7 +1,5 @@ -from typing import List, Optional, Type +from typing import List, Optional, Type, Tuple from urllib.parse import urlparse, urlunparse, parse_qs -import logging -from dataclasses import dataclass from enum import Enum from ..objects import Source, DatabaseObject @@ -13,11 +11,13 @@ from ..objects import ( Song, Album, Label, - Target + Target, + FormattedText, + ID3Timestamp ) from ..connection import Connection from ..utils.support_classes import DownloadResult -from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE +from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE """ @@ -83,14 +83,14 @@ class YouTubeUrl: if "list" not in query_stuff: self.couldnt_find_id(url) else: - self.id = query_stuff["list"] + self.id = query_stuff["list"][0] elif self.url_type == YouTubeUrlType.VIDEO: query_stuff = parse_qs(parsed.query) if "v" not in query_stuff: self.couldnt_find_id(url) else: - self.id = query_stuff["v"] + self.id = query_stuff["v"][0] def couldnt_find_id(self, url: str): @@ -133,6 +133,11 @@ class YouTube(Page): logger=self.LOGGER ) + self.download_connection: Connection = Connection( + host="https://www.youtube.com/", + logger=self.LOGGER + ) + super().__init__(*args, **kwargs) def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: @@ -147,11 +152,7 @@ class YouTube(Page): return _url_type[parsed.url_type] def general_search(self, search_query: str) -> List[DatabaseObject]: - - return [Artist(name="works")] - - def label_search(self, label: Label) -> List[Label]: - return [] + return self.artist_search(Artist(name=search_query, dynamic=True)) def _json_to_artist(self, artist_json: dict) -> Artist:# return Artist( @@ -168,6 +169,9 @@ class YouTube(Page): artist_list = [] r = self.connection.get(endpoint) + if r is None: + return [] + for search_result in r.json(): if search_result["type"] != "channel": continue @@ -185,11 +189,90 @@ class YouTube(Page): def song_search(self, song: Song) -> List[Song]: return [] + def _fetch_song_from_id(self, youtube_id: str) -> Tuple[Song, Optional[int]]: + # https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY + r = self.connection.get(get_invidious_url(path=f"/api/v1/videos/{youtube_id}")) + if r is None: + return Song(), None + + data = r.json() + if data["genre"] != "Music": + self.LOGGER.warning(f"Genre has to be music, trying anyways") + + title = data["title"] + license_str = None + for music_track in data.get("musicTracks", []): + title = music_track["song"] + license_str = music_track["license"] + + return Song( + title=title, + source_list=[Source( + self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}") + )], + notes=FormattedText(html=data["descriptionHtml"] + f"\n
{license_str} p>" ) + ), int(data["published"]) + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: - return Song() + parsed = YouTubeUrl(source.url) + if parsed.url_type != YouTubeUrlType.VIDEO: + return Song() + + song, _ = self._fetch_song_from_id(parsed.id) + return song def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: - return Album() + parsed = YouTubeUrl(source.url) + if parsed.url_type != YouTubeUrlType.PLAYLIST: + return Album() + + title = None + source_list = [source] + notes = None + song_list = [] + + # https://yt.artemislena.eu/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM + r = self.connection.get(get_invidious_url(path=f"/api/v1/playlists/{parsed.id}")) + if r is None: + return Album() + + data = r.json() + if data["type"] != "playlist": + return Album() + + title = data["title"] + notes = FormattedText(html=data["descriptionHtml"]) + + timestamps: List[int] = [] + + """ + TODO + fetch the song and don't get it from there + """ + for video in data["videos"]: + other_song = Song( + source_list=[ + Source( + self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={video['videoId']}") + ) + ], + tracksort=video["index"]+1 + ) + + song, utc_timestamp = self._fetch_song_from_id(video["videoId"]) + song.merge(other_song) + + if utc_timestamp is not None: + timestamps.append(utc_timestamp) + song_list.append(song) + + return Album( + title=title, + source_list=source_list, + notes=notes, + song_list=song_list, + date=ID3Timestamp.fromtimestamp(round(sum(timestamps) / len(timestamps))) + ) def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: parsed = YouTubeUrl(source.url) @@ -202,6 +285,9 @@ class YouTube(Page): # playlist # https://yt.artemislena.eu/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA r = self.connection.get(get_invidious_url(f"/api/v1/channels/playlists/{parsed.id}")) + if r is None: + return Artist() + for playlist_json in r.json()["playlists"]: if playlist_json["type"] != "playlist": continue @@ -224,8 +310,51 @@ class YouTube(Page): return Artist(name=artist_name, main_album_list=album_list, source_list=[source]) - def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: - return Label() - def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: - return DownloadResult() + """ + 1. getting the optimal source + Only audio sources allowed + not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger + + 2. download it + + :param source: + :param target: + :param desc: + :return: + """ + r = self.connection.get(YouTubeUrl(source.url).api) + if r is None: + return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance") + + audio_format = None + best_bitrate = 0 + + for possible_format in r.json()["adaptiveFormats"]: + format_type: str = possible_format["type"] + if not format_type.startswith("audio"): + continue + + bitrate = int(possible_format.get("bitrate", 0)) + + if bitrate >= BITRATE: + best_bitrate = bitrate + audio_format = possible_format + break + + if bitrate > best_bitrate: + best_bitrate = bitrate + audio_format = possible_format + + if audio_format is None: + return DownloadResult(error_message="Couldn't find the download link.") + + endpoint = audio_format["url"] + + r = self.download_connection.get(endpoint, stream=True, raw_url=True) + if r is None: + return DownloadResult(error_message=f"Couldn't connect to {endpoint}") + + if target.stream_into(r, desc=desc): + return DownloadResult(total=1) + return DownloadResult(error_message=f"Streaming to the file went wrong: {endpoint}, {str(target.file_path)}")