Compare commits
3 Commits
da8887b279
...
ac6c513d56
Author | SHA1 | Date | |
---|---|---|---|
ac6c513d56 | |||
cc14253239 | |||
14f986a497 |
@ -10,12 +10,12 @@ from ..objects import Target
|
||||
LOGGER = logging_settings["codex_logger"]
|
||||
|
||||
|
||||
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None):
|
||||
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
|
||||
if not target.exists:
|
||||
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
|
||||
return
|
||||
|
||||
interval_list = interval_list or []
|
||||
skip_intervals = skip_intervals or []
|
||||
|
||||
bitrate_b = int(bitrate_kb / 1024)
|
||||
|
||||
@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
|
||||
|
||||
start = 0
|
||||
next_start = 0
|
||||
for end, next_start in interval_list:
|
||||
for end, next_start in skip_intervals:
|
||||
aselect_list.append(f"between(t,{start},{end})")
|
||||
start = next_start
|
||||
aselect_list.append(f"gte(t,{next_start})")
|
||||
|
@ -178,8 +178,6 @@ class Downloader:
|
||||
page_count = 0
|
||||
for option in self.current_results.formatted_generator():
|
||||
if isinstance(option, Option):
|
||||
_downloadable = self.pages.is_downloadable(option.music_object)
|
||||
|
||||
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
|
||||
print(r)
|
||||
else:
|
||||
|
@ -16,6 +16,7 @@ from ..objects import (
|
||||
Artist,
|
||||
Label,
|
||||
)
|
||||
from ..audio import write_metadata_to_target, correct_codec
|
||||
from ..utils.string_processing import fit_to_file_system
|
||||
from ..utils.config import youtube_settings, main_settings
|
||||
from ..utils.path_manager import LOCATIONS
|
||||
@ -125,15 +126,10 @@ class Pages:
|
||||
return data_object
|
||||
|
||||
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
|
||||
page: Page = self._get_page_from_enum(source.source_type)
|
||||
if page is None:
|
||||
if not source.has_page:
|
||||
return None
|
||||
|
||||
# getting the appropriate function for the page and the object type
|
||||
source_type = page.get_source_type(source)
|
||||
if not hasattr(page, fetch_map[source_type]):
|
||||
return None
|
||||
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
|
||||
func = getattr(source.page, fetch_map[source_type])(source=source, **kwargs)
|
||||
|
||||
# fetching the data object and marking it as fetched
|
||||
data_object: DataObject = func(source=source)
|
||||
@ -147,15 +143,6 @@ class Pages:
|
||||
|
||||
return self.fetch_from_source(source=source)
|
||||
|
||||
def is_downloadable(self, music_object: DataObject) -> bool:
|
||||
_page_types = set(self._source_to_page)
|
||||
for src in music_object.source_collection.source_pages:
|
||||
if src in self._source_to_page:
|
||||
_page_types.add(self._source_to_page[src])
|
||||
|
||||
audio_pages = self._audio_pages_set.intersection(_page_types)
|
||||
return len(audio_pages) > 0
|
||||
|
||||
def _skip_object(self, data_object: DataObject) -> bool:
|
||||
if isinstance(data_object, Album):
|
||||
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
|
||||
@ -224,11 +211,6 @@ class Pages:
|
||||
|
||||
return possible_parts
|
||||
|
||||
def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]:
|
||||
pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()]
|
||||
pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True)
|
||||
return list(pages)
|
||||
|
||||
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
|
||||
"""
|
||||
TODO
|
||||
@ -257,7 +239,6 @@ class Pages:
|
||||
|
||||
# manage the targets
|
||||
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
|
||||
found_on_disc = False
|
||||
|
||||
song.target_collection.append(Target(
|
||||
relative_to_music_dir=True,
|
||||
@ -269,18 +250,54 @@ class Pages:
|
||||
for target in song.target_collection:
|
||||
if target.exists():
|
||||
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
||||
|
||||
found_on_disc = True
|
||||
r.found_on_disk += 1
|
||||
|
||||
if self.download_options.download_again_if_found:
|
||||
target.copy_content(tmp)
|
||||
else:
|
||||
target.create_parent_directories()
|
||||
output(f'- {target.file_path}', color=BColors.GREY)
|
||||
|
||||
# actually download
|
||||
for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"):
|
||||
r = page.download_song_to_target(song, tmp, r)
|
||||
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
|
||||
used_source: Optional[Source] = None
|
||||
skip_intervals: List[Tuple[float, float]] = []
|
||||
for source in song.source_collection.get_sources(source_type_sorting={
|
||||
"only_with_page": True,
|
||||
"sort_key": lambda page: page.download_priority,
|
||||
"reverse": True,
|
||||
}):
|
||||
if tmp.exists:
|
||||
break
|
||||
|
||||
used_source = source
|
||||
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
|
||||
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
|
||||
|
||||
# if something has been downloaded but it somehow failed, delete the file
|
||||
if streaming_results.is_fatal_error and tmp.exists:
|
||||
tmp.delete()
|
||||
|
||||
# if everything went right, the file should exist now
|
||||
if not tmp.exists:
|
||||
if used_source is None:
|
||||
r.error_message = f"No source found for {song.option_string}."
|
||||
else:
|
||||
r.error_message = f"Something went wrong downloading {song.option_string}."
|
||||
return r
|
||||
|
||||
# post process the audio
|
||||
found_on_disk = used_source is None
|
||||
if not found_on_disk or self.download_options.process_audio_if_found:
|
||||
correct_codec(target=tmp, skip_intervals=skip_intervals)
|
||||
r.sponsor_segments = len(skip_intervals)
|
||||
|
||||
if used_source is not None:
|
||||
used_source.page.post_process_hook(song=song, temp_target=tmp)
|
||||
|
||||
if not found_on_disc or self.download_options.process_metadata_if_found:
|
||||
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
|
||||
|
||||
tmp.delete()
|
||||
return r
|
||||
|
||||
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
|
||||
|
@ -2,7 +2,19 @@ from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
|
||||
from typing import (
|
||||
List,
|
||||
Dict,
|
||||
Set,
|
||||
Tuple,
|
||||
Optional,
|
||||
Iterable,
|
||||
Generator,
|
||||
TypedDict,
|
||||
Callable,
|
||||
Any,
|
||||
TYPE_CHECKING
|
||||
)
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
from dataclasses import dataclass, field
|
||||
from functools import cached_property
|
||||
@ -15,6 +27,8 @@ from ..utils.string_processing import hash_url, shorten_display_url
|
||||
from .metadata import Mapping, Metadata
|
||||
from .parents import OuterProxy
|
||||
from .collection import Collection
|
||||
if TYPE_CHECKING:
|
||||
from ..pages.abstract import Page
|
||||
|
||||
|
||||
|
||||
@ -30,10 +44,6 @@ class Source:
|
||||
def __post_init__(self):
|
||||
self.referrer_page = self.referrer_page or self.source_type
|
||||
|
||||
@property
|
||||
def parsed_url(self) -> ParseResult:
|
||||
return urlparse(self.url)
|
||||
|
||||
@classmethod
|
||||
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
|
||||
"""
|
||||
@ -77,6 +87,18 @@ class Source:
|
||||
if url.startswith("https://myspace.com"):
|
||||
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
|
||||
|
||||
@property
|
||||
def has_page(self) -> bool:
|
||||
return self.source_type.page is not None
|
||||
|
||||
@property
|
||||
def page(self) -> Page:
|
||||
return self.source_type.page
|
||||
|
||||
@property
|
||||
def parsed_url(self) -> ParseResult:
|
||||
return urlparse(self.url)
|
||||
|
||||
@property
|
||||
def hash_url(self) -> str:
|
||||
return hash_url(self.url)
|
||||
@ -99,11 +121,17 @@ class Source:
|
||||
page_str = property(fget=lambda self: self.source_type.value)
|
||||
|
||||
|
||||
class SourceTypeSorting(TypedDict):
|
||||
sort_key: Callable[[SourceType], Any]
|
||||
reverse: bool
|
||||
only_with_page: bool
|
||||
|
||||
|
||||
class SourceCollection:
|
||||
__change_version__ = generate_id()
|
||||
|
||||
_indexed_sources: Dict[str, Source]
|
||||
_page_to_source_list: Dict[SourceType, List[Source]]
|
||||
_sources_by_type: Dict[SourceType, List[Source]]
|
||||
|
||||
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
|
||||
self._page_to_source_list = defaultdict(list)
|
||||
@ -111,15 +139,54 @@ class SourceCollection:
|
||||
|
||||
self.extend(data or [])
|
||||
|
||||
def has_source_page(self, *source_pages: SourceType) -> bool:
|
||||
return any(source_page in self._page_to_source_list for source_page in source_pages)
|
||||
def source_types(
|
||||
self,
|
||||
only_with_page: bool = False,
|
||||
sort_key = lambda page: page.name,
|
||||
reverse: bool = False
|
||||
) -> Iterable[SourceType]:
|
||||
"""
|
||||
Returns a list of all source types contained in this source collection.
|
||||
|
||||
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
|
||||
if not len(source_pages):
|
||||
source_pages = self.source_pages
|
||||
Args:
|
||||
only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
|
||||
sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
|
||||
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
|
||||
|
||||
for page in source_pages:
|
||||
yield from self._page_to_source_list[page]
|
||||
Returns:
|
||||
Iterable[SourceType]: A list of source types.
|
||||
"""
|
||||
|
||||
source_types: List[SourceType] = self._page_to_source_list.keys()
|
||||
if only_with_page:
|
||||
source_types = filter(lambda st: st.has_page, source_types)
|
||||
|
||||
return sorted(
|
||||
source_types,
|
||||
key=sort_key,
|
||||
reverse=reverse
|
||||
)
|
||||
|
||||
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
|
||||
"""
|
||||
Retrieves sources based on the provided source types and source type sorting.
|
||||
|
||||
Args:
|
||||
*source_types (List[Source]): Variable number of source types to filter the sources.
|
||||
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
|
||||
|
||||
Yields:
|
||||
Generator[Source]: A generator that yields the sources based on the provided filters.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
if not len(source_types):
|
||||
source_type_sorting = source_type_sorting or {}
|
||||
source_types = self.source_types(**source_type_sorting)
|
||||
|
||||
for source_type in source_types:
|
||||
yield from self._page_to_source_list[source_type]
|
||||
|
||||
def append(self, source: Source):
|
||||
if source is None:
|
||||
@ -156,10 +223,6 @@ class SourceCollection:
|
||||
def __merge__(self, other: SourceCollection, **kwargs):
|
||||
self.extend(other)
|
||||
|
||||
@property
|
||||
def source_pages(self) -> Iterable[SourceType]:
|
||||
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
|
||||
|
||||
@property
|
||||
def hash_url_list(self) -> List[str]:
|
||||
return [hash_url(source.url) for source in self.get_sources()]
|
||||
@ -170,7 +233,7 @@ class SourceCollection:
|
||||
|
||||
@property
|
||||
def homepage_list(self) -> List[str]:
|
||||
return [source.homepage for source in self.source_pages]
|
||||
return [source_type.homepage for source_type in self._sources_by_type.keys()]
|
||||
|
||||
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
|
||||
for index in self._indexed_sources:
|
||||
|
@ -246,8 +246,6 @@ class Page:
|
||||
else:
|
||||
output(f'- {target.file_path}', color=BColors.GREY)
|
||||
|
||||
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
|
||||
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
|
||||
|
||||
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
|
||||
|
||||
@ -264,15 +262,15 @@ class Page:
|
||||
r.merge(self._post_process_targets(
|
||||
song=song,
|
||||
temp_target=temp_target,
|
||||
interval_list=skip_intervals,
|
||||
skip_intervals=skip_intervals,
|
||||
found_on_disc=found_on_disc,
|
||||
))
|
||||
|
||||
return r
|
||||
|
||||
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
|
||||
def _post_process_targets(self, song: Song, temp_target: Target, skip_intervals: List, found_on_disc: bool) -> DownloadResult:
|
||||
if not found_on_disc or self.download_options.process_audio_if_found:
|
||||
correct_codec(temp_target, interval_list=interval_list)
|
||||
correct_codec(temp_target, skip_intervals=skip_intervals)
|
||||
|
||||
self.post_process_hook(song, temp_target)
|
||||
|
||||
@ -288,7 +286,7 @@ class Page:
|
||||
r.add_target(target)
|
||||
|
||||
temp_target.delete()
|
||||
r.sponsor_segments += len(interval_list)
|
||||
r.sponsor_segments += len(skip_intervals)
|
||||
|
||||
return r
|
||||
|
||||
|
@ -51,7 +51,6 @@ class BandcampTypes(Enum):
|
||||
|
||||
class Bandcamp(Page):
|
||||
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
|
||||
LOGGER = logging_settings["bandcamp_logger"]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.connection: Connection = Connection(
|
||||
|
@ -41,8 +41,6 @@ class YouTube(SuperYouTube):
|
||||
# CHANGE
|
||||
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
|
||||
|
||||
NO_ADDITIONAL_DATA_FROM_SONG = False
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.connection: Connection = Connection(
|
||||
host=get_invidious_url(),
|
||||
|
Loading…
Reference in New Issue
Block a user