completely implemented downloading from youtube

This commit is contained in:
Hellow 2023-06-13 20:10:11 +02:00
parent 79c38387c8
commit 73576b0453
4 changed files with 168 additions and 20 deletions

View File

@ -14,7 +14,8 @@ if __name__ == "__main__":
youtube_search = [ youtube_search = [
"s: #a Zombiez", "s: #a Zombiez",
"10" "10",
"8"
] ]
music_kraken.cli(genre="test", command_list=youtube_search) music_kraken.cli(genre="test", command_list=youtube_search)

View File

@ -1,3 +1,4 @@
import time
from typing import List, Dict, Callable, Optional, Set from typing import List, Dict, Callable, Optional, Set
from urllib.parse import urlparse, urlunsplit, ParseResult from urllib.parse import urlparse, urlunsplit, ParseResult
import logging import logging
@ -82,6 +83,7 @@ class Connection:
headers: dict, headers: dict,
refer_from_origin: bool = True, refer_from_origin: bool = True,
raw_url: bool = False, raw_url: bool = False,
wait_on_403: bool = True,
**kwargs **kwargs
) -> Optional[requests.Response]: ) -> Optional[requests.Response]:
if try_count >= self.TRIES: if try_count >= self.TRIES:
@ -122,12 +124,15 @@ class Connection:
self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} " self.LOGGER.warning(f"{self.HOST.netloc} responded wit {r.status_code} "
f"at {url}. ({try_count}-{self.TRIES})") f"at {url}. ({try_count}-{self.TRIES})")
self.LOGGER.debug(r.content) self.LOGGER.debug(r.content)
if wait_on_403:
self.LOGGER.warning(f"Waiting for 5 seconds.")
time.sleep(5)
self.rotate() self.rotate()
return self._request( return self._request(
request=request, request=request,
try_count=try_count, try_count=try_count+1,
accepted_response_code=accepted_response_code, accepted_response_code=accepted_response_code,
url=url, url=url,
timeout=timeout, timeout=timeout,

View File

@ -205,6 +205,19 @@ class ID3Timestamp:
time_format = self.get_time_format() time_format = self.get_time_format()
return time_format, self.date_obj.strftime(time_format) return time_format, self.date_obj.strftime(time_format)
@classmethod
def fromtimestamp(cls, utc_timestamp: int):
date_obj = datetime.datetime.fromtimestamp(utc_timestamp)
return cls(
year=date_obj.year,
month=date_obj.month,
day=date_obj.day,
hour=date_obj.hour,
minute=date_obj.minute,
second=date_obj.second
)
@classmethod @classmethod
def strptime(cls, time_stamp: str, format: str): def strptime(cls, time_stamp: str, format: str):
""" """

View File

@ -1,7 +1,5 @@
from typing import List, Optional, Type from typing import List, Optional, Type, Tuple
from urllib.parse import urlparse, urlunparse, parse_qs from urllib.parse import urlparse, urlunparse, parse_qs
import logging
from dataclasses import dataclass
from enum import Enum from enum import Enum
from ..objects import Source, DatabaseObject from ..objects import Source, DatabaseObject
@ -13,11 +11,13 @@ from ..objects import (
Song, Song,
Album, Album,
Label, Label,
Target Target,
FormattedText,
ID3Timestamp
) )
from ..connection import Connection from ..connection import Connection
from ..utils.support_classes import DownloadResult from ..utils.support_classes import DownloadResult
from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE
""" """
@ -83,14 +83,14 @@ class YouTubeUrl:
if "list" not in query_stuff: if "list" not in query_stuff:
self.couldnt_find_id(url) self.couldnt_find_id(url)
else: else:
self.id = query_stuff["list"] self.id = query_stuff["list"][0]
elif self.url_type == YouTubeUrlType.VIDEO: elif self.url_type == YouTubeUrlType.VIDEO:
query_stuff = parse_qs(parsed.query) query_stuff = parse_qs(parsed.query)
if "v" not in query_stuff: if "v" not in query_stuff:
self.couldnt_find_id(url) self.couldnt_find_id(url)
else: else:
self.id = query_stuff["v"] self.id = query_stuff["v"][0]
def couldnt_find_id(self, url: str): def couldnt_find_id(self, url: str):
@ -133,6 +133,11 @@ class YouTube(Page):
logger=self.LOGGER logger=self.LOGGER
) )
self.download_connection: Connection = Connection(
host="https://www.youtube.com/",
logger=self.LOGGER
)
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]: def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
@ -147,11 +152,7 @@ class YouTube(Page):
return _url_type[parsed.url_type] return _url_type[parsed.url_type]
def general_search(self, search_query: str) -> List[DatabaseObject]: def general_search(self, search_query: str) -> List[DatabaseObject]:
return self.artist_search(Artist(name=search_query, dynamic=True))
return [Artist(name="works")]
def label_search(self, label: Label) -> List[Label]:
return []
def _json_to_artist(self, artist_json: dict) -> Artist:# def _json_to_artist(self, artist_json: dict) -> Artist:#
return Artist( return Artist(
@ -168,6 +169,9 @@ class YouTube(Page):
artist_list = [] artist_list = []
r = self.connection.get(endpoint) r = self.connection.get(endpoint)
if r is None:
return []
for search_result in r.json(): for search_result in r.json():
if search_result["type"] != "channel": if search_result["type"] != "channel":
continue continue
@ -185,11 +189,90 @@ class YouTube(Page):
def song_search(self, song: Song) -> List[Song]: def song_search(self, song: Song) -> List[Song]:
return [] return []
def _fetch_song_from_id(self, youtube_id: str) -> Tuple[Song, Optional[int]]:
# https://yt.artemislena.eu/api/v1/videos/SULFl39UjgY
r = self.connection.get(get_invidious_url(path=f"/api/v1/videos/{youtube_id}"))
if r is None:
return Song(), None
data = r.json()
if data["genre"] != "Music":
self.LOGGER.warning(f"Genre has to be music, trying anyways")
title = data["title"]
license_str = None
for music_track in data.get("musicTracks", []):
title = music_track["song"]
license_str = music_track["license"]
return Song(
title=title,
source_list=[Source(
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={data['videoId']}")
)],
notes=FormattedText(html=data["descriptionHtml"] + f"\n<p>{license_str}</ p>" )
), int(data["published"])
def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song: def fetch_song(self, source: Source, stop_at_level: int = 1) -> Song:
return Song() parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.VIDEO:
return Song()
song, _ = self._fetch_song_from_id(parsed.id)
return song
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album: def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
return Album() parsed = YouTubeUrl(source.url)
if parsed.url_type != YouTubeUrlType.PLAYLIST:
return Album()
title = None
source_list = [source]
notes = None
song_list = []
# https://yt.artemislena.eu/api/v1/playlists/OLAK5uy_kcUBiDv5ATbl-R20OjNaZ5G28XFanQOmM
r = self.connection.get(get_invidious_url(path=f"/api/v1/playlists/{parsed.id}"))
if r is None:
return Album()
data = r.json()
if data["type"] != "playlist":
return Album()
title = data["title"]
notes = FormattedText(html=data["descriptionHtml"])
timestamps: List[int] = []
"""
TODO
fetch the song and don't get it from there
"""
for video in data["videos"]:
other_song = Song(
source_list=[
Source(
self.SOURCE_TYPE, get_invidious_url(path="/watch", query=f"v={video['videoId']}")
)
],
tracksort=video["index"]+1
)
song, utc_timestamp = self._fetch_song_from_id(video["videoId"])
song.merge(other_song)
if utc_timestamp is not None:
timestamps.append(utc_timestamp)
song_list.append(song)
return Album(
title=title,
source_list=source_list,
notes=notes,
song_list=song_list,
date=ID3Timestamp.fromtimestamp(round(sum(timestamps) / len(timestamps)))
)
def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist: def fetch_artist(self, source: Source, stop_at_level: int = 1) -> Artist:
parsed = YouTubeUrl(source.url) parsed = YouTubeUrl(source.url)
@ -202,6 +285,9 @@ class YouTube(Page):
# playlist # playlist
# https://yt.artemislena.eu/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA # https://yt.artemislena.eu/api/v1/channels/playlists/UCV0Ntl3lVR7xDXKoCU6uUXA
r = self.connection.get(get_invidious_url(f"/api/v1/channels/playlists/{parsed.id}")) r = self.connection.get(get_invidious_url(f"/api/v1/channels/playlists/{parsed.id}"))
if r is None:
return Artist()
for playlist_json in r.json()["playlists"]: for playlist_json in r.json()["playlists"]:
if playlist_json["type"] != "playlist": if playlist_json["type"] != "playlist":
continue continue
@ -224,8 +310,51 @@ class YouTube(Page):
return Artist(name=artist_name, main_album_list=album_list, source_list=[source]) return Artist(name=artist_name, main_album_list=album_list, source_list=[source])
def fetch_label(self, source: Source, stop_at_level: int = 1) -> Label:
return Label()
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult: def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
return DownloadResult() """
1. getting the optimal source
Only audio sources allowed
not a bitrate that is smaller than the selected bitrate, but not one that is wayyy huger
2. download it
:param source:
:param target:
:param desc:
:return:
"""
r = self.connection.get(YouTubeUrl(source.url).api)
if r is None:
return DownloadResult(error_message="Api didn't even respond, maybe try another invidious Instance")
audio_format = None
best_bitrate = 0
for possible_format in r.json()["adaptiveFormats"]:
format_type: str = possible_format["type"]
if not format_type.startswith("audio"):
continue
bitrate = int(possible_format.get("bitrate", 0))
if bitrate >= BITRATE:
best_bitrate = bitrate
audio_format = possible_format
break
if bitrate > best_bitrate:
best_bitrate = bitrate
audio_format = possible_format
if audio_format is None:
return DownloadResult(error_message="Couldn't find the download link.")
endpoint = audio_format["url"]
r = self.download_connection.get(endpoint, stream=True, raw_url=True)
if r is None:
return DownloadResult(error_message=f"Couldn't connect to {endpoint}")
if target.stream_into(r, desc=desc):
return DownloadResult(total=1)
return DownloadResult(error_message=f"Streaming to the file went wrong: {endpoint}, {str(target.file_path)}")