roughly finished implementing sponsorblock
This commit is contained in:
parent
1e49089de9
commit
cc5c7809fb
@ -1,5 +1,6 @@
|
|||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
import ffmpeg
|
import ffmpeg
|
||||||
|
import subprocess
|
||||||
|
|
||||||
from ..utils.shared import BITRATE, AUDIO_FORMAT, CODEX_LOGGER as LOGGER
|
from ..utils.shared import BITRATE, AUDIO_FORMAT, CODEX_LOGGER as LOGGER
|
||||||
from ..objects import Target
|
from ..objects import Target
|
||||||
@ -10,6 +11,12 @@ def remove_intervals(target: Target, interval_list: List[Tuple[float, float]]):
|
|||||||
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
|
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
temp_target = Target(
|
||||||
|
path=target._path,
|
||||||
|
file=str(target._file) + ".temp"
|
||||||
|
)
|
||||||
|
target.copy_content(temp_target)
|
||||||
|
|
||||||
# https://stackoverflow.com/questions/50594412/cut-multiple-parts-of-a-video-with-ffmpeg
|
# https://stackoverflow.com/questions/50594412/cut-multiple-parts-of-a-video-with-ffmpeg
|
||||||
aselect_list: List[str] = []
|
aselect_list: List[str] = []
|
||||||
|
|
||||||
@ -23,6 +30,13 @@ def remove_intervals(target: Target, interval_list: List[Tuple[float, float]]):
|
|||||||
|
|
||||||
select = f"aselect='{'+'.join(aselect_list)}',asetpts=N/SR/TB"
|
select = f"aselect='{'+'.join(aselect_list)}',asetpts=N/SR/TB"
|
||||||
|
|
||||||
|
command = f"ffmpeg -i {str(temp_target.file_path)} -af \"{select}\" {str(target.file_path)}"
|
||||||
|
r = subprocess.run(command)
|
||||||
|
if r.stderr != "":
|
||||||
|
LOGGER.debug(r.stderr)
|
||||||
|
|
||||||
|
temp_target.delete()
|
||||||
|
|
||||||
|
|
||||||
def correct_codec(target: Target, bitrate_kb: int = BITRATE, audio_format: str = AUDIO_FORMAT):
|
def correct_codec(target: Target, bitrate_kb: int = BITRATE, audio_format: str = AUDIO_FORMAT):
|
||||||
if not target.exists:
|
if not target.exists:
|
||||||
@ -49,4 +63,4 @@ def correct_codec(target: Target, bitrate_kb: int = BITRATE, audio_format: str =
|
|||||||
LOGGER.debug(err)
|
LOGGER.debug(err)
|
||||||
|
|
||||||
output_target.copy_content(target)
|
output_target.copy_content(target)
|
||||||
output_target.file_path.unlink()
|
output_target.delete()
|
||||||
|
@ -354,6 +354,9 @@ class Page:
|
|||||||
|
|
||||||
def _post_process_targets(self, song: Song, temp_target: Target) -> DownloadResult:
|
def _post_process_targets(self, song: Song, temp_target: Target) -> DownloadResult:
|
||||||
correct_codec(temp_target)
|
correct_codec(temp_target)
|
||||||
|
|
||||||
|
self.post_process_hook(song, temp_target)
|
||||||
|
|
||||||
write_metadata_to_target(song.metadata, temp_target)
|
write_metadata_to_target(song.metadata, temp_target)
|
||||||
|
|
||||||
r = DownloadResult()
|
r = DownloadResult()
|
||||||
@ -368,5 +371,8 @@ class Page:
|
|||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
def download_song_to_target(self, source: Source, target: Target, desc: str = None) -> DownloadResult:
|
||||||
return DownloadResult()
|
return DownloadResult()
|
||||||
|
@ -2,6 +2,11 @@ from typing import List, Optional, Type, Tuple
|
|||||||
from urllib.parse import urlparse, urlunparse, parse_qs
|
from urllib.parse import urlparse, urlunparse, parse_qs
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
|
import sponsorblock
|
||||||
|
|
||||||
|
from music_kraken.objects import Song, Target
|
||||||
|
from music_kraken.utils.support_classes import DownloadResult
|
||||||
|
|
||||||
from ..objects import Source, DatabaseObject
|
from ..objects import Source, DatabaseObject
|
||||||
from .abstract import Page
|
from .abstract import Page
|
||||||
from ..objects import (
|
from ..objects import (
|
||||||
@ -15,9 +20,10 @@ from ..objects import (
|
|||||||
FormattedText,
|
FormattedText,
|
||||||
ID3Timestamp
|
ID3Timestamp
|
||||||
)
|
)
|
||||||
|
from ..audio.codec import remove_intervals
|
||||||
from ..connection import Connection
|
from ..connection import Connection
|
||||||
from ..utils.support_classes import DownloadResult
|
from ..utils.support_classes import DownloadResult
|
||||||
from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE
|
from ..utils.shared import YOUTUBE_LOGGER, INVIDIOUS_INSTANCE, BITRATE, ENABLE_SPONSOR_BLOCK
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@ -138,6 +144,10 @@ class YouTube(Page):
|
|||||||
logger=self.LOGGER
|
logger=self.LOGGER
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# the stuff with the connection is, to ensure sponsorblock uses the proxies, my programm does
|
||||||
|
_sponsorblock_connection: Connection = Connection()
|
||||||
|
self.sponsorblock_client = sponsorblock.Client(session=_sponsorblock_connection.session)
|
||||||
|
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
def get_source_type(self, source: Source) -> Optional[Type[DatabaseObject]]:
|
||||||
@ -375,3 +385,23 @@ class YouTube(Page):
|
|||||||
if target.stream_into(r, desc=desc):
|
if target.stream_into(r, desc=desc):
|
||||||
return DownloadResult(total=1)
|
return DownloadResult(total=1)
|
||||||
return DownloadResult(error_message=f"Streaming to the file went wrong: {endpoint}, {str(target.file_path)}")
|
return DownloadResult(error_message=f"Streaming to the file went wrong: {endpoint}, {str(target.file_path)}")
|
||||||
|
|
||||||
|
def post_process_hook(self, song: Song, temp_target: Target, **kwargs):
|
||||||
|
if not ENABLE_SPONSOR_BLOCK:
|
||||||
|
return
|
||||||
|
|
||||||
|
# find the youtube id
|
||||||
|
source_list = song.source_collection.get_sources_from_page(self.SOURCE_TYPE)
|
||||||
|
if len(source_list) <= 0:
|
||||||
|
self.LOGGER.warning(f"Couldn't find a youtube source in the post_processing_hook for {song.option_string}")
|
||||||
|
return
|
||||||
|
|
||||||
|
source = source_list[0]
|
||||||
|
parsed = YouTubeUrl(source.url)
|
||||||
|
if parsed.url_type != YouTubeUrlType.VIDEO:
|
||||||
|
self.LOGGER.warning(f"{source.url} is no video url.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# call the sponsorblock api, and remove the segments from the audio
|
||||||
|
segments = self.sponsorblock_client.get_skip_segments(video_id=parsed.id)
|
||||||
|
remove_intervals(temp_target, [(segment.start, segment.end) for segment in segments])
|
||||||
|
@ -92,6 +92,7 @@ if TOR:
|
|||||||
'https': f'socks5h://127.0.0.1:{CONNECTION_SECTION.TOR_PORT.object_from_value}'
|
'https': f'socks5h://127.0.0.1:{CONNECTION_SECTION.TOR_PORT.object_from_value}'
|
||||||
}
|
}
|
||||||
INVIDIOUS_INSTANCE: ParseResult = CONNECTION_SECTION.INVIDIOUS_INSTANCE.object_from_value
|
INVIDIOUS_INSTANCE: ParseResult = CONNECTION_SECTION.INVIDIOUS_INSTANCE.object_from_value
|
||||||
|
ENABLE_SPONSOR_BLOCK: bool = CONNECTION_SECTION.SPONSOR_BLOCK.object_from_value
|
||||||
|
|
||||||
# size of the chunks that are streamed
|
# size of the chunks that are streamed
|
||||||
CHUNK_SIZE = CONNECTION_SECTION.CHUNK_SIZE.object_from_value
|
CHUNK_SIZE = CONNECTION_SECTION.CHUNK_SIZE.object_from_value
|
||||||
|
Loading…
Reference in New Issue
Block a user