implemented youtube downloading
This commit is contained in:
		@@ -22,6 +22,8 @@ from ..connection import Connection
 | 
			
		||||
from ..utils.support_classes import DownloadResult
 | 
			
		||||
from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403
 | 
			
		||||
 | 
			
		||||
from .youtube_music.super_youtube import SuperYouTube, YouTubeUrl, get_invidious_url, YouTubeUrlType
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
- https://yt.artemislena.eu/api/v1/search?q=Zombiez+-+Topic&page=1&date=none&type=channel&duration=none&sort=relevance
 | 
			
		||||
@@ -31,104 +33,11 @@ from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_S
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
 | 
			
		||||
    return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment))
 | 
			
		||||
 | 
			
		||||
def get_piped_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
 | 
			
		||||
    return urlunparse((PIPED_INSTANCE.scheme, PIPED_INSTANCE.netloc, path, params, query, fragment))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YouTubeUrlType(Enum):
 | 
			
		||||
    CHANNEL = "channel"
 | 
			
		||||
    PLAYLIST = "playlist"
 | 
			
		||||
    VIDEO = "watch"
 | 
			
		||||
    NONE = ""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YouTubeUrl:
 | 
			
		||||
    """
 | 
			
		||||
    Artist
 | 
			
		||||
    https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
 | 
			
		||||
    https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
 | 
			
		||||
    
 | 
			
		||||
    Release
 | 
			
		||||
    https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
 | 
			
		||||
    https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
 | 
			
		||||
    
 | 
			
		||||
    Track
 | 
			
		||||
    https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1
 | 
			
		||||
    https://www.youtube.com/watch?v=SULFl39UjgY
 | 
			
		||||
    """
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, url: str) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Raises Index exception for wrong url, and value error for not found enum type
 | 
			
		||||
        """
 | 
			
		||||
        self.id = ""
 | 
			
		||||
        parsed = urlparse(url=url)
 | 
			
		||||
        
 | 
			
		||||
        self.url_type: YouTubeUrlType
 | 
			
		||||
        
 | 
			
		||||
        type_frag_list = parsed.path.split("/")
 | 
			
		||||
        if len(type_frag_list) < 2:
 | 
			
		||||
            self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                self.url_type = YouTubeUrlType(type_frag_list[1].strip())
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
                
 | 
			
		||||
        if self.url_type == YouTubeUrlType.CHANNEL:
 | 
			
		||||
            if len(type_frag_list) < 3:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = type_frag_list[2]
 | 
			
		||||
        
 | 
			
		||||
        elif self.url_type == YouTubeUrlType.PLAYLIST:
 | 
			
		||||
            query_stuff = parse_qs(parsed.query)
 | 
			
		||||
            if "list" not in query_stuff:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = query_stuff["list"][0]
 | 
			
		||||
        
 | 
			
		||||
        elif self.url_type == YouTubeUrlType.VIDEO:
 | 
			
		||||
            query_stuff = parse_qs(parsed.query)
 | 
			
		||||
            if "v" not in query_stuff:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = query_stuff["v"][0]
 | 
			
		||||
            
 | 
			
		||||
        
 | 
			
		||||
    def couldnt_find_id(self, url: str):
 | 
			
		||||
        YOUTUBE_LOGGER.warning(f"The id is missing: {url}")
 | 
			
		||||
        self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
        
 | 
			
		||||
    @property
 | 
			
		||||
    def api(self) -> str:
 | 
			
		||||
        if self.url_type == YouTubeUrlType.CHANNEL:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type == YouTubeUrlType.PLAYLIST:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/playlists/{id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type == YouTubeUrlType.VIDEO:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/videos/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        return get_invidious_url()
 | 
			
		||||
            
 | 
			
		||||
    @property
 | 
			
		||||
    def normal(self) -> str:
 | 
			
		||||
        if self.url_type.CHANNEL:
 | 
			
		||||
            return get_invidious_url(path=f"/channel/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type.PLAYLIST:
 | 
			
		||||
            return get_invidious_url(path="/playlist", query=f"list={self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type.VIDEO:
 | 
			
		||||
            return get_invidious_url(path="/watch", query=f"v={self.id}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YouTube(Page):
 | 
			
		||||
class YouTube(SuperYouTube):
 | 
			
		||||
    # CHANGE
 | 
			
		||||
    SOURCE_TYPE = SourcePages.YOUTUBE
 | 
			
		||||
    LOGGER = YOUTUBE_LOGGER
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										199
									
								
								src/music_kraken/pages/youtube_music/super_youtube.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										199
									
								
								src/music_kraken/pages/youtube_music/super_youtube.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,199 @@
 | 
			
		||||
from typing import List, Optional, Type, Tuple
 | 
			
		||||
from urllib.parse import urlparse, urlunparse, parse_qs
 | 
			
		||||
from enum import Enum
 | 
			
		||||
 | 
			
		||||
import sponsorblock
 | 
			
		||||
from sponsorblock.errors import HTTPException, NotFoundException
 | 
			
		||||
 | 
			
		||||
from ...objects import Source, DatabaseObject, Song, Target
 | 
			
		||||
from ..abstract import Page
 | 
			
		||||
from ...objects import (
 | 
			
		||||
    Artist,
 | 
			
		||||
    Source,
 | 
			
		||||
    SourcePages,
 | 
			
		||||
    Song,
 | 
			
		||||
    Album,
 | 
			
		||||
    Label,
 | 
			
		||||
    Target,
 | 
			
		||||
    FormattedText,
 | 
			
		||||
    ID3Timestamp
 | 
			
		||||
)
 | 
			
		||||
from ...connection import Connection
 | 
			
		||||
from ...utils.support_classes import DownloadResult
 | 
			
		||||
from ...utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK, PIPED_INSTANCE, SLEEP_AFTER_YOUTUBE_403
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_invidious_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
 | 
			
		||||
    return urlunparse((INVIDIOUS_INSTANCE.scheme, INVIDIOUS_INSTANCE.netloc, path, params, query, fragment))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YouTubeUrlType(Enum):
 | 
			
		||||
    CHANNEL = "channel"
 | 
			
		||||
    PLAYLIST = "playlist"
 | 
			
		||||
    VIDEO = "watch"
 | 
			
		||||
    NONE = ""
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YouTubeUrl:
 | 
			
		||||
    """
 | 
			
		||||
    Artist
 | 
			
		||||
    https://yt.artemislena.eu/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
 | 
			
		||||
    https://www.youtube.com/channel/UCV0Ntl3lVR7xDXKoCU6uUXA
 | 
			
		||||
    
 | 
			
		||||
    Release
 | 
			
		||||
    https://yt.artemislena.eu/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
 | 
			
		||||
    https://www.youtube.com/playlist?list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw
 | 
			
		||||
    
 | 
			
		||||
    Track
 | 
			
		||||
    https://yt.artemislena.eu/watch?v=SULFl39UjgY&list=OLAK5uy_nEg5joAyFjHBPwnS_ADHYtgSqAjFMQKLw&index=1
 | 
			
		||||
    https://www.youtube.com/watch?v=SULFl39UjgY
 | 
			
		||||
    """
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, url: str) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Raises Index exception for wrong url, and value error for not found enum type
 | 
			
		||||
        """
 | 
			
		||||
        self.id = ""
 | 
			
		||||
        parsed = urlparse(url=url)
 | 
			
		||||
        
 | 
			
		||||
        self.url_type: YouTubeUrlType
 | 
			
		||||
        
 | 
			
		||||
        type_frag_list = parsed.path.split("/")
 | 
			
		||||
        if len(type_frag_list) < 2:
 | 
			
		||||
            self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                self.url_type = YouTubeUrlType(type_frag_list[1].strip())
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
                
 | 
			
		||||
        if self.url_type == YouTubeUrlType.CHANNEL:
 | 
			
		||||
            if len(type_frag_list) < 3:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = type_frag_list[2]
 | 
			
		||||
        
 | 
			
		||||
        elif self.url_type == YouTubeUrlType.PLAYLIST:
 | 
			
		||||
            query_stuff = parse_qs(parsed.query)
 | 
			
		||||
            if "list" not in query_stuff:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = query_stuff["list"][0]
 | 
			
		||||
        
 | 
			
		||||
        elif self.url_type == YouTubeUrlType.VIDEO:
 | 
			
		||||
            query_stuff = parse_qs(parsed.query)
 | 
			
		||||
            if "v" not in query_stuff:
 | 
			
		||||
                self.couldnt_find_id(url)
 | 
			
		||||
            else:
 | 
			
		||||
                self.id = query_stuff["v"][0]
 | 
			
		||||
            
 | 
			
		||||
        
 | 
			
		||||
    def couldnt_find_id(self, url: str):
 | 
			
		||||
        YOUTUBE_LOGGER.warning(f"The id is missing: {url}")
 | 
			
		||||
        self.url_type = YouTubeUrlType.NONE
 | 
			
		||||
        
 | 
			
		||||
    @property
 | 
			
		||||
    def api(self) -> str:
 | 
			
		||||
        if self.url_type == YouTubeUrlType.CHANNEL:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/channels/playlists/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type == YouTubeUrlType.PLAYLIST:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/playlists/{id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type == YouTubeUrlType.VIDEO:
 | 
			
		||||
            return get_invidious_url(path=f"/api/v1/videos/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        return get_invidious_url()
 | 
			
		||||
            
 | 
			
		||||
    @property
 | 
			
		||||
    def normal(self) -> str:
 | 
			
		||||
        if self.url_type.CHANNEL:
 | 
			
		||||
            return get_invidious_url(path=f"/channel/{self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type.PLAYLIST:
 | 
			
		||||
            return get_invidious_url(path="/playlist", query=f"list={self.id}")
 | 
			
		||||
        
 | 
			
		||||
        if self.url_type.VIDEO:
 | 
			
		||||
            return get_invidious_url(path="/watch", query=f"v={self.id}")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SuperYouTube(Page):
 | 
			
		||||
    # CHANGE
 | 
			
		||||
    SOURCE_TYPE = SourcePages.YOUTUBE
 | 
			
		||||
    LOGGER = YOUTUBE_LOGGER
 | 
			
		||||
 | 
			
		||||
    NO_ADDITIONAL_DATA_FROM_SONG = True
 | 
			
		||||
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
        self.download_connection: Connection = Connection(
 | 
			
		||||
            host="https://www.youtube.com/",
 | 
			
		||||
            logger=self.LOGGER,
 | 
			
		||||
            sleep_after_404=SLEEP_AFTER_YOUTUBE_403
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        # the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
 | 
			
		||||
        _sponsorblock_connection: Connection = Connection(host="https://sponsor.ajay.app/")
 | 
			
		||||
        self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session)
 | 
			
		||||
 | 
			
		||||
    def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
 | 
			
		||||
        """
 | 
			
		||||
        1. getting the optimal source
 | 
			
		||||
        Only audio sources allowed
 | 
			
		||||
        not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger
 | 
			
		||||
 | 
			
		||||
        2. download it
 | 
			
		||||
 | 
			
		||||
        :param source:
 | 
			
		||||
        :param target:
 | 
			
		||||
        :param desc:
 | 
			
		||||
        :return:
 | 
			
		||||
        """
 | 
			
		||||
        r = self.connection.get(YouTubeUrl(source.url).api)
 | 
			
		||||
        if r is None:
 | 
			
		||||
            return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance")
 | 
			
		||||
 | 
			
		||||
        audio_format = None
 | 
			
		||||
        best_bitrate = 0
 | 
			
		||||
 | 
			
		||||
        for possible_format in r.json()["adaptiveFormats"]:
 | 
			
		||||
            format_type: str = possible_format["type"]
 | 
			
		||||
            if not format_type.startswith("audio"):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            bitrate = int(possible_format.get("bitrate", 0))
 | 
			
		||||
 | 
			
		||||
            if bitrate >= BITRATE:
 | 
			
		||||
                best_bitrate = bitrate
 | 
			
		||||
                audio_format = possible_format
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
            if bitrate > best_bitrate:
 | 
			
		||||
                best_bitrate = bitrate
 | 
			
		||||
                audio_format = possible_format
 | 
			
		||||
 | 
			
		||||
        if audio_format is None:
 | 
			
		||||
            return DownloadResult(error_message="Couldn't find the download link.")
 | 
			
		||||
 | 
			
		||||
        endpoint = audio_format["url"]
 | 
			
		||||
 | 
			
		||||
        return self.download_connection.stream_into(endpoint, target, description=desc, raw_url=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def get_skip_intervals(self, song: Song, source: Source) -> List[Tuple[float, float]]:
 | 
			
		||||
        if not ENABLE_SPONSOR_BLOCK:
 | 
			
		||||
            return []
 | 
			
		||||
        
 | 
			
		||||
        parsed = YouTubeUrl(source.url)
 | 
			
		||||
        if parsed.url_type != YouTubeUrlType.VIDEO:
 | 
			
		||||
            self.LOGGER.warning(f"{source.url} is no video url.")
 | 
			
		||||
            return []
 | 
			
		||||
        
 | 
			
		||||
        segments = []
 | 
			
		||||
        try:
 | 
			
		||||
            segments = self.sponsorblock_client.get_skip_segments(parsed.id)
 | 
			
		||||
        except NotFoundException:
 | 
			
		||||
            self.LOGGER.debug(f"No sponsor found for the video {parsed.id}.")
 | 
			
		||||
        except HTTPException as e:
 | 
			
		||||
            self.LOGGER.warning(f"{e}")
 | 
			
		||||
 | 
			
		||||
        return [(segment.start, segment.end) for segment in segments]
 | 
			
		||||
@@ -28,6 +28,7 @@ from ...connection import Connection
 | 
			
		||||
from ...utils.support_classes import DownloadResult
 | 
			
		||||
 | 
			
		||||
from ._list_render import parse_renderer
 | 
			
		||||
from .super_youtube import SuperYouTube
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_youtube_url(path: str = "", params: str = "", query: str = "", fragment: str = "") -> str:
 | 
			
		||||
@@ -90,9 +91,9 @@ class YouTubeMusicCredentials:
 | 
			
		||||
    context: dict
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class YoutubeMusic(Page):
 | 
			
		||||
class YoutubeMusic(SuperYouTube):
 | 
			
		||||
    # CHANGE
 | 
			
		||||
    SOURCE_TYPE = SourcePages.YOUTUBE
 | 
			
		||||
    SOURCE_TYPE = SourcePages.YOUTUBE_MUSIC
 | 
			
		||||
    LOGGER = YOUTUBE_MUSIC_LOGGER
 | 
			
		||||
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
@@ -276,17 +277,6 @@ class YoutubeMusic(Page):
 | 
			
		||||
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    def label_search(self, label: Label) -> List[Label]:
 | 
			
		||||
        return []
 | 
			
		||||
    
 | 
			
		||||
    def artist_search(self, artist: Artist) -> List[Artist]:
 | 
			
		||||
        return []
 | 
			
		||||
    
 | 
			
		||||
    def album_search(self, album: Album) -> List[Album]:
 | 
			
		||||
        return []
 | 
			
		||||
    
 | 
			
		||||
    def song_search(self, song: Song) -> List[Song]:
 | 
			
		||||
        return []
 | 
			
		||||
    
 | 
			
		||||
    def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
 | 
			
		||||
        return Song()
 | 
			
		||||
@@ -296,9 +286,3 @@ class YoutubeMusic(Page):
 | 
			
		||||
 | 
			
		||||
    def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
 | 
			
		||||
        return Artist()
 | 
			
		||||
 | 
			
		||||
    def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
 | 
			
		||||
        return Label()
 | 
			
		||||
 | 
			
		||||
    def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
 | 
			
		||||
        return DownloadResult()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user