From fc83ba3d2314239dd094e7844a7f0bc6db75b7b5 Mon Sep 17 00:00:00 2001 From: Hellow <74311245+HeIIow2@users.noreply.github.com> Date: Mon, 31 Jul 2023 19:12:09 +0200 Subject: [PATCH] implemented youtube downloading --- src/music_kraken/pages/youtube.py | 97 +-------- .../pages/youtube_music/super_youtube.py | 199 ++++++++++++++++++ .../pages/youtube_music/youtube_music.py | 24 +-- 3 files changed, 206 insertions(+), 114 deletions(-) create mode 100644 src/music_kraken/pages/youtube_music/super_youtube.py diff --git a/src/music_kraken/pages/youtube.py b/src/music_kraken/pages/youtube.py index 3d63d7c..8b884ee 100644 --- a/src/music_kraken/pages/youtube.py +++ b/src/music_kraken/pages/youtube.py @@ -22,6 +22,8 @@ from ..connection import Connection from ..utils.support_classes import DownloadResult from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403 +from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType + """ - https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance @@ -31,104 +33,11 @@ from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_S """ -def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: - return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment)) - def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: return urlunparse((PIPED_INSTANCE.scheme, PIPED_INSTANCE.netloc, path, params, query, fragment)) -class YouTubeUrlType(Enum): - CHANNEL = "channel" - PLAYLIST = "playlist" - VIDEO = "watch" - NONE = "" - - -class YouTubeUrl: - """ - Artist - https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA - https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA - - Release - https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw - https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw - - Track - https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1 - https://www.youtube.com/watch?v=SULFl39UjgY - """ - - def __init__(self, url: str) -> None: - """ - Raises Index exception for wrong url, and value error for not found enum type - """ - self.id = "" - parsed = urlparse(url=url) - - self.url_type: YouTubeUrlType - - type_frag_list = parsed.path.split("/") - if len(type_frag_list) < 2: - self.url_type = YouTubeUrlType.NONE - else: - try: - self.url_type = YouTubeUrlType(type_frag_list[1].strip()) - except ValueError: - self.url_type = YouTubeUrlType.NONE - - if self.url_type == YouTubeUrlType.CHANNEL: - if len(type_frag_list) < 3: - self.couldnt_find_id(url) - else: - self.id = type_frag_list[2] - - elif self.url_type == YouTubeUrlType.PLAYLIST: - query_stuff = parse_qs(parsed.query) - if "list" not in query_stuff: - self.couldnt_find_id(url) - else: - self.id = query_stuff["list"][0] - - elif self.url_type == YouTubeUrlType.VIDEO: - query_stuff = parse_qs(parsed.query) - if "v" not in query_stuff: - self.couldnt_find_id(url) - else: - self.id = query_stuff["v"][0] - - - def couldnt_find_id(self, url: str): - YOUTUBE_LOGGER.warning(f"The id is missing: {url}") - self.url_type = YouTubeUrlType.NONE - - @property - def api(self) -> str: - if self.url_type == YouTubeUrlType.CHANNEL: - return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}") - - if self.url_type == YouTubeUrlType.PLAYLIST: - return get_invidious_url(path=f"/api/v1/playlists/{id}") - - if self.url_type == YouTubeUrlType.VIDEO: - return get_invidious_url(path=f"/api/v1/videos/{self.id}") - - return get_invidious_url() - - @property - def normal(self) -> str: - if self.url_type.CHANNEL: - return get_invidious_url(path=f"/channel/{self.id}") - - if self.url_type.PLAYLIST: - return get_invidious_url(path="/playlist", query=f"list={self.id}") - - if self.url_type.VIDEO: - return get_invidious_url(path="/watch", query=f"v={self.id}") - - -class YouTube(Page): +class YouTube(SuperYouTube): # CHANGE SOURCE_TYPE = SourcePages.YOUTUBE LOGGER = YOUTUBE_LOGGER diff --git a/src/music_kraken/pages/youtube_music/super_youtube.py b/src/music_kraken/pages/youtube_music/super_youtube.py new file mode 100644 index 0000000..d1153f4 --- /dev/null +++ b/src/music_kraken/pages/youtube_music/super_youtube.py @@ -0,0 +1,199 @@ +from typing import List, Optional, Type, Tuple +from urllib.parse import urlparse, urlunparse, parse_qs +from enum import Enum + +import sponsorblock +from sponsorblock.errors import HTTPException, NotFoundException + +from ...objects import Source, DatabaseObject, Song, Target +from ..abstract import Page +from ...objects import ( + Artist, + Source, + SourcePages, + Song, + Album, + Label, + Target, + FormattedText, + ID3Timestamp +) +from ...connection import Connection +from ...utils.support_classes import DownloadResult +from ...utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403 + + +def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: + return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment)) + + +class YouTubeUrlType(Enum): + CHANNEL = "channel" + PLAYLIST = "playlist" + VIDEO = "watch" + NONE = "" + + +class YouTubeUrl: + """ + Artist + https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA + https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA + + Release + https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw + https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw + + Track + https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1 + https://www.youtube.com/watch?v=SULFl39UjgY + """ + + def __init__(self, url: str) -> None: + """ + Raises Index exception for wrong url, and value error for not found enum type + """ + self.id = "" + parsed = urlparse(url=url) + + self.url_type: YouTubeUrlType + + type_frag_list = parsed.path.split("/") + if len(type_frag_list) < 2: + self.url_type = YouTubeUrlType.NONE + else: + try: + self.url_type = YouTubeUrlType(type_frag_list[1].strip()) + except ValueError: + self.url_type = YouTubeUrlType.NONE + + if self.url_type == YouTubeUrlType.CHANNEL: + if len(type_frag_list) < 3: + self.couldnt_find_id(url) + else: + self.id = type_frag_list[2] + + elif self.url_type == YouTubeUrlType.PLAYLIST: + query_stuff = parse_qs(parsed.query) + if "list" not in query_stuff: + self.couldnt_find_id(url) + else: + self.id = query_stuff["list"][0] + + elif self.url_type == YouTubeUrlType.VIDEO: + query_stuff = parse_qs(parsed.query) + if "v" not in query_stuff: + self.couldnt_find_id(url) + else: + self.id = query_stuff["v"][0] + + + def couldnt_find_id(self, url: str): + YOUTUBE_LOGGER.warning(f"The id is missing: {url}") + self.url_type = YouTubeUrlType.NONE + + @property + def api(self) -> str: + if self.url_type == YouTubeUrlType.CHANNEL: + return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}") + + if self.url_type == YouTubeUrlType.PLAYLIST: + return get_invidious_url(path=f"/api/v1/playlists/{id}") + + if self.url_type == YouTubeUrlType.VIDEO: + return get_invidious_url(path=f"/api/v1/videos/{self.id}") + + return get_invidious_url() + + @property + def normal(self) -> str: + if self.url_type.CHANNEL: + return get_invidious_url(path=f"/channel/{self.id}") + + if self.url_type.PLAYLIST: + return get_invidious_url(path="/playlist", query=f"list={self.id}") + + if self.url_type.VIDEO: + return get_invidious_url(path="/watch", query=f"v={self.id}") + + +class SuperYouTube(Page): + # CHANGE + SOURCE_TYPE = SourcePages.YOUTUBE + LOGGER = YOUTUBE_LOGGER + + NO_ADDITIONAL_DATA_FROM_SONG = True + + def __init__(self, *args, **kwargs): + self.download_connection: Connection = Connection( + host="https://www.youtube.com/", + logger=self.LOGGER, + sleep_after_404=SLEEP_AFTER_YOUTUBE_403 + ) + + # the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does + _sponsorblock_connection: Connection = Connection(host="https://sponsor.ajay.app/") + self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session) + + def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: + """ + 1. getting the optimal source + Only audio sources allowed + not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger + + 2. download it + + :param source: + :param target: + :param desc: + :return: + """ + r = self.connection.get(YouTubeUrl(source.url).api) + if r is None: + return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance") + + audio_format = None + best_bitrate = 0 + + for possible_format in r.json()["adaptiveFormats"]: + format_type: str = possible_format["type"] + if not format_type.startswith("audio"): + continue + + bitrate = int(possible_format.get("bitrate", 0)) + + if bitrate >= BITRATE: + best_bitrate = bitrate + audio_format = possible_format + break + + if bitrate > best_bitrate: + best_bitrate = bitrate + audio_format = possible_format + + if audio_format is None: + return DownloadResult(error_message="Couldn't find the download link.") + + endpoint = audio_format["url"] + + return self.download_connection.stream_into(endpoint, target, description=desc, raw_url=True) + + + def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]: + if not ENABLE_SPONSOR_BLOCK: + return [] + + parsed = YouTubeUrl(source.url) + if parsed.url_type != YouTubeUrlType.VIDEO: + self.LOGGER.warning(f"{source.url} is no video url.") + return [] + + segments = [] + try: + segments = self.sponsorblock_client.get_skip_segments(parsed.id) + except NotFoundException: + self.LOGGER.debug(f"No sponsor found for the video {parsed.id}.") + except HTTPException as e: + self.LOGGER.warning(f"{e}") + + return [(segment.start, segment.end) for segment in segments] diff --git a/src/music_kraken/pages/youtube_music/youtube_music.py b/src/music_kraken/pages/youtube_music/youtube_music.py index 9ea17db..bc03692 100644 --- a/src/music_kraken/pages/youtube_music/youtube_music.py +++ b/src/music_kraken/pages/youtube_music/youtube_music.py @@ -28,6 +28,7 @@ from ...connection import Connection from ...utils.support_classes import DownloadResult from ._list_render import parse_renderer +from .super_youtube import SuperYouTube def get_youtube_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str: @@ -90,9 +91,9 @@ class YouTubeMusicCredentials: context: dict -class YoutubeMusic(Page): +class YoutubeMusic(SuperYouTube): # CHANGE - SOURCE_TYPE = SourcePages.YOUTUBE + SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC LOGGER = YOUTUBE_MUSIC_LOGGER def __init__(self, *args, **kwargs): @@ -275,18 +276,7 @@ class YoutubeMusic(Page): results.extend(parse_renderer(renderer)) return results - - def label_search(self, label: Label) -> List[Label]: - return [] - - def artist_search(self, artist: Artist) -> List[Artist]: - return [] - - def album_search(self, album: Album) -> List[Album]: - return [] - - def song_search(self, song: Song) -> List[Song]: - return [] + def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: return Song() @@ -296,9 +286,3 @@ class YoutubeMusic(Page): def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: return Artist() - - def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label: - return Label() - - def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: - return DownloadResult()