Compare commits

...

3 Commits

Author SHA1 Message Date
ac6c513d56 draft: post process song
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-05-15 12:30:54 +02:00
cc14253239 draft: streaming the audio 2024-05-15 12:18:08 +02:00
14f986a497 draft: rewrote sources 2024-05-15 11:44:39 +02:00
7 changed files with 133 additions and 60 deletions

View File

@ -10,12 +10,12 @@ from ..objects import Target
LOGGER = logging_settings["codex_logger"] LOGGER = logging_settings["codex_logger"]
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None): def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
if not target.exists: if not target.exists:
LOGGER.warning(f"Target doesn't exist: {target.file_path}") LOGGER.warning(f"Target doesn't exist: {target.file_path}")
return return
interval_list = interval_list or [] skip_intervals = skip_intervals or []
bitrate_b = int(bitrate_kb / 1024) bitrate_b = int(bitrate_kb / 1024)
@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
start = 0 start = 0
next_start = 0 next_start = 0
for end, next_start in interval_list: for end, next_start in skip_intervals:
aselect_list.append(f"between(t,{start},{end})") aselect_list.append(f"between(t,{start},{end})")
start = next_start start = next_start
aselect_list.append(f"gte(t,{next_start})") aselect_list.append(f"gte(t,{next_start})")

View File

@ -178,8 +178,6 @@ class Downloader:
page_count = 0 page_count = 0
for option in self.current_results.formatted_generator(): for option in self.current_results.formatted_generator():
if isinstance(option, Option): if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object)
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r) print(r)
else: else:

View File

@ -16,6 +16,7 @@ from ..objects import (
Artist, Artist,
Label, Label,
) )
from ..audio import write_metadata_to_target, correct_codec
from ..utils.string_processing import fit_to_file_system from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS from ..utils.path_manager import LOCATIONS
@ -125,15 +126,10 @@ class Pages:
return data_object return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
page: Page = self._get_page_from_enum(source.source_type) if not source.has_page:
if page is None:
return None return None
# getting the appropriate function for the page and the object type func = getattr(source.page, fetch_map[source_type])(source=source, **kwargs)
source_type = page.get_source_type(source)
if not hasattr(page, fetch_map[source_type]):
return None
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
# fetching the data object and marking it as fetched # fetching the data object and marking it as fetched
data_object: DataObject = func(source=source) data_object: DataObject = func(source=source)
@ -147,15 +143,6 @@ class Pages:
return self.fetch_from_source(source=source) return self.fetch_from_source(source=source)
def is_downloadable(self, music_object: DataObject) -> bool:
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
audio_pages = self._audio_pages_set.intersection(_page_types)
return len(audio_pages) > 0
def _skip_object(self, data_object: DataObject) -> bool: def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album): if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
@ -224,11 +211,6 @@ class Pages:
return possible_parts return possible_parts
def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]:
pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()]
pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True)
return list(pages)
def _download_song(self, song: Song, naming: dict) -> DownloadOptions: def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
""" """
TODO TODO
@ -257,7 +239,6 @@ class Pages:
# manage the targets # manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
found_on_disc = False
song.target_collection.append(Target( song.target_collection.append(Target(
relative_to_music_dir=True, relative_to_music_dir=True,
@ -269,18 +250,54 @@ class Pages:
for target in song.target_collection: for target in song.target_collection:
if target.exists(): if target.exists():
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
found_on_disc = True
r.found_on_disk += 1 r.found_on_disk += 1
target.copy_content(tmp)
if self.download_options.download_again_if_found:
target.copy_content(tmp)
else: else:
target.create_parent_directories() target.create_parent_directories()
output(f'- {target.file_path}', color=BColors.GREY) output(f'- {target.file_path}', color=BColors.GREY)
# actually download # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"): used_source: Optional[Source] = None
r = page.download_song_to_target(song, tmp, r) skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disc or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
tmp.delete()
return r return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:

View File

@ -2,7 +2,19 @@ from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import cached_property from functools import cached_property
@ -15,6 +27,8 @@ from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata from .metadata import Mapping, Metadata
from .parents import OuterProxy from .parents import OuterProxy
from .collection import Collection from .collection import Collection
if TYPE_CHECKING:
from ..pages.abstract import Page
@ -30,10 +44,6 @@ class Source:
def __post_init__(self): def __post_init__(self):
self.referrer_page = self.referrer_page or self.source_type self.referrer_page = self.referrer_page or self.source_type
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod @classmethod
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]: def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
""" """
@ -77,6 +87,18 @@ class Source:
if url.startswith("https://myspace.com"): if url.startswith("https://myspace.com"):
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page) return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property @property
def hash_url(self) -> str: def hash_url(self) -> str:
return hash_url(self.url) return hash_url(self.url)
@ -99,11 +121,17 @@ class Source:
page_str = property(fget=lambda self: self.source_type.value) page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection: class SourceCollection:
__change_version__ = generate_id() __change_version__ = generate_id()
_indexed_sources: Dict[str, Source] _indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourceType, List[Source]] _sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list) self._page_to_source_list = defaultdict(list)
@ -111,15 +139,54 @@ class SourceCollection:
self.extend(data or []) self.extend(data or [])
def has_source_page(self, *source_pages: SourceType) -> bool: def source_types(
return any(source_page in self._page_to_source_list for source_page in source_pages) self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: Args:
if not len(source_pages): only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
source_pages = self.source_pages sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
for page in source_pages: Returns:
yield from self._page_to_source_list[page] Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._page_to_source_list.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._page_to_source_list[source_type]
def append(self, source: Source): def append(self, source: Source):
if source is None: if source is None:
@ -156,10 +223,6 @@ class SourceCollection:
def __merge__(self, other: SourceCollection, **kwargs): def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other) self.extend(other)
@property
def source_pages(self) -> Iterable[SourceType]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property @property
def hash_url_list(self) -> List[str]: def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()] return [hash_url(source.url) for source in self.get_sources()]
@ -170,7 +233,7 @@ class SourceCollection:
@property @property
def homepage_list(self) -> List[str]: def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages] return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]: def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources: for index in self._indexed_sources:

View File

@ -246,8 +246,6 @@ class Page:
else: else:
output(f'- {target.file_path}', color=BColors.GREY) output(f'- {target.file_path}', color=BColors.GREY)
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE) sources = song.source_collection.get_sources(self.SOURCE_TYPE)
@ -264,15 +262,15 @@ class Page:
r.merge(self._post_process_targets( r.merge(self._post_process_targets(
song=song, song=song,
temp_target=temp_target, temp_target=temp_target,
interval_list=skip_intervals, skip_intervals=skip_intervals,
found_on_disc=found_on_disc, found_on_disc=found_on_disc,
)) ))
return r return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult: def _post_process_targets(self, song: Song, temp_target: Target, skip_intervals: List, found_on_disc: bool) -> DownloadResult:
if not found_on_disc or self.download_options.process_audio_if_found: if not found_on_disc or self.download_options.process_audio_if_found:
correct_codec(temp_target, interval_list=interval_list) correct_codec(temp_target, skip_intervals=skip_intervals)
self.post_process_hook(song, temp_target) self.post_process_hook(song, temp_target)
@ -288,7 +286,7 @@ class Page:
r.add_target(target) r.add_target(target)
temp_target.delete() temp_target.delete()
r.sponsor_segments += len(interval_list) r.sponsor_segments += len(skip_intervals)
return r return r

View File

@ -51,7 +51,6 @@ class BandcampTypes(Enum):
class Bandcamp(Page): class Bandcamp(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(

View File

@ -41,8 +41,6 @@ class YouTube(SuperYouTube):
# CHANGE # CHANGE
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
NO_ADDITIONAL_DATA_FROM_SONG = False
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.connection: Connection = Connection( self.connection: Connection = Connection(
host=get_invidious_url(), host=get_invidious_url(),