feat: moved id parsing in class
This commit is contained in:
parent
0f01238365
commit
e28ae87bbe
@ -2,30 +2,10 @@ import requests
|
|||||||
import logging
|
import logging
|
||||||
from urllib.parse import urlparse, urlunparse, parse_qs
|
from urllib.parse import urlparse, urlunparse, parse_qs
|
||||||
import re
|
import re
|
||||||
from typing import Optional
|
from typing import Optional, List
|
||||||
|
|
||||||
from .exceptions import SponsorBlockError, SponsorBlockIdNotFoundError
|
from .exceptions import SponsorBlockError, SponsorBlockIdNotFoundError
|
||||||
|
from .constants import Segment
|
||||||
|
|
||||||
def _get_video_id(i: str, silent: bool = False) -> None:
|
|
||||||
# check with regex if i is already an id like r1Fa1iWJVEA
|
|
||||||
if re.match(r"^[a-zA-Z0-9_-]{11}$", i):
|
|
||||||
return i.strip()
|
|
||||||
|
|
||||||
url = urlparse(url=i)
|
|
||||||
|
|
||||||
if url.netloc == "youtu.be":
|
|
||||||
return url.path[1:]
|
|
||||||
|
|
||||||
type_frag_list = url.path.split("/")
|
|
||||||
|
|
||||||
query_stuff = parse_qs(url.query)
|
|
||||||
if "v" not in query_stuff:
|
|
||||||
if not silent:
|
|
||||||
raise SponsorBlockIdNotFoundError("No video id found in the url")
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return query_stuff["v"][0]
|
|
||||||
|
|
||||||
class SponsorBlock:
|
class SponsorBlock:
|
||||||
def __init__(self, session: requests.Session = None, base_url: str = "https://sponsor.ajay.app", silent: bool = False, _requests_logging_exists: bool = False):
|
def __init__(self, session: requests.Session = None, base_url: str = "https://sponsor.ajay.app", silent: bool = False, _requests_logging_exists: bool = False):
|
||||||
@ -37,6 +17,25 @@ class SponsorBlock:
|
|||||||
|
|
||||||
self.logger: logging.Logger = logging.Logger("SponsorBlock")
|
self.logger: logging.Logger = logging.Logger("SponsorBlock")
|
||||||
|
|
||||||
|
def _get_video_id(self, video: str) -> Optional[str]:
|
||||||
|
if re.match(r"^[a-zA-Z0-9_-]{11}$", video):
|
||||||
|
return video.strip()
|
||||||
|
|
||||||
|
url = urlparse(url=video)
|
||||||
|
|
||||||
|
if url.netloc == "youtu.be":
|
||||||
|
return url.path[1:]
|
||||||
|
|
||||||
|
type_frag_list = url.path.split("/")
|
||||||
|
|
||||||
|
query_stuff = parse_qs(url.query)
|
||||||
|
if "v" not in query_stuff:
|
||||||
|
if not self.silent:
|
||||||
|
raise SponsorBlockIdNotFoundError("No video id found in the url")
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
return query_stuff["v"][0]
|
||||||
|
|
||||||
def _request(self, method: str, endpoint: str) -> Optional[requests.Response]:
|
def _request(self, method: str, endpoint: str) -> Optional[requests.Response]:
|
||||||
error_message = ""
|
error_message = ""
|
||||||
url = self.base_url + endpoint
|
url = self.base_url + endpoint
|
||||||
@ -55,4 +54,8 @@ class SponsorBlock:
|
|||||||
if not self.silent:
|
if not self.silent:
|
||||||
raise exceptions.SponsorBlockConnectionError(error_message)
|
raise exceptions.SponsorBlockConnectionError(error_message)
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
def get_segments(video: str) -> List[Segment]:
|
||||||
|
video_id = _get_video_id
|
||||||
|
r: List[Segment] = []
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from python_sponsorblock import _get_video_id, exceptions
|
from python_sponsorblock import SponsorBlock, exceptions
|
||||||
|
|
||||||
|
|
||||||
MAGIC_ID = "dQw4w9WgXcQ"
|
MAGIC_ID = "dQw4w9WgXcQ"
|
||||||
@ -8,25 +8,32 @@ MAGIC_ID = "dQw4w9WgXcQ"
|
|||||||
|
|
||||||
class TestVideoIdParsing(unittest.TestCase):
|
class TestVideoIdParsing(unittest.TestCase):
|
||||||
def test_already_parsed_id(self):
|
def test_already_parsed_id(self):
|
||||||
self.assertEqual(_get_video_id(MAGIC_ID), MAGIC_ID)
|
s = SponsorBlock()
|
||||||
|
self.assertEqual(s._get_video_id(MAGIC_ID), MAGIC_ID)
|
||||||
|
|
||||||
def test_non_existing_id(self):
|
def test_non_existing_id(self):
|
||||||
|
s = SponsorBlock()
|
||||||
with self.assertRaises(exceptions.SponsorBlockIdNotFoundError):
|
with self.assertRaises(exceptions.SponsorBlockIdNotFoundError):
|
||||||
_get_video_id("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee")
|
s._get_video_id("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee")
|
||||||
|
|
||||||
self.assertEqual(_get_video_id("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", silent=True), None)
|
s = SponsorBlock(silent=True)
|
||||||
|
self.assertEqual(s._get_video_id("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"), None)
|
||||||
|
|
||||||
def test_youtu_dot_be(self):
|
def test_youtu_dot_be(self):
|
||||||
self.assertEqual(_get_video_id("https://youtu.be/" + MAGIC_ID), MAGIC_ID)
|
s = SponsorBlock()
|
||||||
|
self.assertEqual(s._get_video_id("https://youtu.be/" + MAGIC_ID), MAGIC_ID)
|
||||||
|
|
||||||
def test_youtube_dot_com(self):
|
def test_youtube_dot_com(self):
|
||||||
self.assertEqual(_get_video_id("https://www.youtube.com/watch?v=" + MAGIC_ID), MAGIC_ID)
|
s = SponsorBlock()
|
||||||
|
self.assertEqual(s._get_video_id("https://www.youtube.com/watch?v=" + MAGIC_ID), MAGIC_ID)
|
||||||
|
|
||||||
def test_invidious(self):
|
def test_invidious(self):
|
||||||
self.assertEqual(_get_video_id("https://invidio.us/watch?v=" + MAGIC_ID), MAGIC_ID)
|
s = SponsorBlock()
|
||||||
|
self.assertEqual(s._get_video_id("https://invidio.us/watch?v=" + MAGIC_ID), MAGIC_ID)
|
||||||
|
|
||||||
def test_piped(self):
|
def test_piped(self):
|
||||||
self.assertEqual(_get_video_id("https://piped.video/watch?v=" + MAGIC_ID), MAGIC_ID)
|
s = SponsorBlock()
|
||||||
|
self.assertEqual(s._get_video_id("https://piped.video/watch?v=" + MAGIC_ID), MAGIC_ID)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user