Compare commits
	
		
			3 Commits
		
	
	
		
			da8887b279
			...
			ac6c513d56
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| ac6c513d56 | |||
| cc14253239 | |||
| 14f986a497 | 
| @@ -10,12 +10,12 @@ from ..objects import Target | ||||
| LOGGER = logging_settings["codex_logger"] | ||||
|  | ||||
|  | ||||
| def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None): | ||||
| def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None): | ||||
|     if not target.exists: | ||||
|         LOGGER.warning(f"Target doesn't exist: {target.file_path}") | ||||
|         return | ||||
|      | ||||
|     interval_list = interval_list or [] | ||||
|     skip_intervals = skip_intervals or [] | ||||
|  | ||||
|     bitrate_b = int(bitrate_kb / 1024) | ||||
|  | ||||
| @@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au | ||||
|      | ||||
|     start = 0 | ||||
|     next_start = 0 | ||||
|     for end, next_start in interval_list: | ||||
|     for end, next_start in skip_intervals: | ||||
|         aselect_list.append(f"between(t,{start},{end})") | ||||
|         start = next_start | ||||
|     aselect_list.append(f"gte(t,{next_start})") | ||||
|   | ||||
| @@ -178,8 +178,6 @@ class Downloader: | ||||
|         page_count = 0 | ||||
|         for option in self.current_results.formatted_generator(): | ||||
|             if isinstance(option, Option): | ||||
|                 _downloadable = self.pages.is_downloadable(option.music_object) | ||||
|  | ||||
|                 r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}" | ||||
|                 print(r) | ||||
|             else: | ||||
|   | ||||
| @@ -16,6 +16,7 @@ from ..objects import ( | ||||
|     Artist, | ||||
|     Label, | ||||
| ) | ||||
| from ..audio import write_metadata_to_target, correct_codec | ||||
| from ..utils.string_processing import fit_to_file_system | ||||
| from ..utils.config import youtube_settings, main_settings | ||||
| from ..utils.path_manager import LOCATIONS | ||||
| @@ -125,15 +126,10 @@ class Pages: | ||||
|         return data_object | ||||
|  | ||||
|     def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]: | ||||
|         page: Page = self._get_page_from_enum(source.source_type) | ||||
|         if page is None: | ||||
|         if not source.has_page: | ||||
|             return None | ||||
|          | ||||
|         # getting the appropriate function for the page and the object type | ||||
|         source_type = page.get_source_type(source) | ||||
|         if not hasattr(page, fetch_map[source_type]): | ||||
|             return None | ||||
|         func = getattr(page, fetch_map[source_type])(source=source, **kwargs) | ||||
|         func = getattr(source.page, fetch_map[source_type])(source=source, **kwargs) | ||||
|          | ||||
|         # fetching the data object and marking it as fetched | ||||
|         data_object: DataObject = func(source=source) | ||||
| @@ -147,15 +143,6 @@ class Pages: | ||||
|          | ||||
|         return self.fetch_from_source(source=source) | ||||
|      | ||||
|     def is_downloadable(self, music_object: DataObject) -> bool: | ||||
|         _page_types = set(self._source_to_page) | ||||
|         for src in music_object.source_collection.source_pages: | ||||
|             if src in self._source_to_page: | ||||
|                 _page_types.add(self._source_to_page[src]) | ||||
|  | ||||
|         audio_pages = self._audio_pages_set.intersection(_page_types) | ||||
|         return len(audio_pages) > 0 | ||||
|      | ||||
|     def _skip_object(self, data_object: DataObject) -> bool: | ||||
|         if isinstance(data_object, Album): | ||||
|             if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist: | ||||
| @@ -224,11 +211,6 @@ class Pages: | ||||
|  | ||||
|         return possible_parts | ||||
|  | ||||
|     def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]: | ||||
|         pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()] | ||||
|         pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True) | ||||
|         return list(pages) | ||||
|  | ||||
|     def _download_song(self, song: Song, naming: dict) -> DownloadOptions: | ||||
|         """ | ||||
|         TODO | ||||
| @@ -257,7 +239,6 @@ class Pages: | ||||
|  | ||||
|         # manage the targets | ||||
|         tmp: Target = Target.temp(file_extension=main_settings["audio_format"]) | ||||
|         found_on_disc = False | ||||
|  | ||||
|         song.target_collection.append(Target( | ||||
|             relative_to_music_dir=True, | ||||
| @@ -269,18 +250,54 @@ class Pages: | ||||
|         for target in song.target_collection: | ||||
|             if target.exists(): | ||||
|                 output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY) | ||||
|  | ||||
|                 found_on_disc = True | ||||
|                 r.found_on_disk += 1 | ||||
|                 target.copy_content(tmp) | ||||
|  | ||||
|                 if self.download_options.download_again_if_found: | ||||
|                     target.copy_content(tmp) | ||||
|             else: | ||||
|                 target.create_parent_directories() | ||||
|                 output(f'- {target.file_path}', color=BColors.GREY) | ||||
|  | ||||
|         # actually download | ||||
|         for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"): | ||||
|             r = page.download_song_to_target(song, tmp, r) | ||||
|         # this streams from every available source until something succeeds, setting the skip intervals to the values of the according source | ||||
|         used_source: Optional[Source] = None | ||||
|         skip_intervals: List[Tuple[float, float]] = [] | ||||
|         for source in song.source_collection.get_sources(source_type_sorting={ | ||||
|             "only_with_page": True, | ||||
|             "sort_key": lambda page: page.download_priority, | ||||
|             "reverse": True, | ||||
|         }): | ||||
|             if tmp.exists: | ||||
|                 break | ||||
|  | ||||
|             used_source = source | ||||
|             streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download") | ||||
|             skip_intervals = source.page.get_skip_intervals(song=song, source=source) | ||||
|  | ||||
|             # if something has been downloaded but it somehow failed, delete the file | ||||
|             if streaming_results.is_fatal_error and tmp.exists: | ||||
|                 tmp.delete() | ||||
|  | ||||
|         # if everything went right, the file should exist now | ||||
|         if not tmp.exists: | ||||
|             if used_source is None: | ||||
|                 r.error_message = f"No source found for {song.option_string}." | ||||
|             else: | ||||
|                 r.error_message = f"Something went wrong downloading {song.option_string}." | ||||
|             return r | ||||
|  | ||||
|         # post process the audio | ||||
|         found_on_disk = used_source is None | ||||
|         if not found_on_disk or self.download_options.process_audio_if_found: | ||||
|             correct_codec(target=tmp, skip_intervals=skip_intervals) | ||||
|             r.sponsor_segments = len(skip_intervals) | ||||
|  | ||||
|         if used_source is not None: | ||||
|             used_source.page.post_process_hook(song=song, temp_target=tmp) | ||||
|  | ||||
|         if not found_on_disc or self.download_options.process_metadata_if_found: | ||||
|             write_metadata_to_target(metadata=song.metadata, target=tmp, song=song) | ||||
|  | ||||
|         tmp.delete() | ||||
|         return r | ||||
|  | ||||
|     def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]: | ||||
|   | ||||
| @@ -2,7 +2,19 @@ from __future__ import annotations | ||||
|  | ||||
| from collections import defaultdict | ||||
| from enum import Enum | ||||
| from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator | ||||
| from typing import ( | ||||
|     List,  | ||||
|     Dict,  | ||||
|     Set,  | ||||
|     Tuple,  | ||||
|     Optional,  | ||||
|     Iterable,  | ||||
|     Generator,  | ||||
|     TypedDict,  | ||||
|     Callable,  | ||||
|     Any, | ||||
|     TYPE_CHECKING | ||||
| ) | ||||
| from urllib.parse import urlparse, ParseResult | ||||
| from dataclasses import dataclass, field | ||||
| from functools import cached_property | ||||
| @@ -15,6 +27,8 @@ from ..utils.string_processing import hash_url, shorten_display_url | ||||
| from .metadata import Mapping, Metadata | ||||
| from .parents import OuterProxy | ||||
| from .collection import Collection | ||||
| if TYPE_CHECKING: | ||||
|     from ..pages.abstract import Page | ||||
|  | ||||
|  | ||||
|  | ||||
| @@ -30,10 +44,6 @@ class Source: | ||||
|     def __post_init__(self): | ||||
|         self.referrer_page = self.referrer_page or self.source_type | ||||
|  | ||||
|     @property | ||||
|     def parsed_url(self) -> ParseResult: | ||||
|         return urlparse(self.url) | ||||
|  | ||||
|     @classmethod | ||||
|     def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]: | ||||
|         """ | ||||
| @@ -77,6 +87,18 @@ class Source: | ||||
|         if url.startswith("https://myspace.com"): | ||||
|             return cls(SourceType.MYSPACE, url, referrer_page=referrer_page) | ||||
|  | ||||
|     @property | ||||
|     def has_page(self) -> bool: | ||||
|         return self.source_type.page is not None | ||||
|      | ||||
|     @property | ||||
|     def page(self) -> Page: | ||||
|         return self.source_type.page | ||||
|  | ||||
|     @property | ||||
|     def parsed_url(self) -> ParseResult: | ||||
|         return urlparse(self.url) | ||||
|  | ||||
|     @property | ||||
|     def hash_url(self) -> str: | ||||
|         return hash_url(self.url) | ||||
| @@ -99,11 +121,17 @@ class Source: | ||||
|     page_str = property(fget=lambda self: self.source_type.value) | ||||
|  | ||||
|  | ||||
| class SourceTypeSorting(TypedDict): | ||||
|     sort_key: Callable[[SourceType], Any] | ||||
|     reverse: bool | ||||
|     only_with_page: bool | ||||
|  | ||||
|  | ||||
| class SourceCollection: | ||||
|     __change_version__ = generate_id() | ||||
|  | ||||
|     _indexed_sources: Dict[str, Source] | ||||
|     _page_to_source_list: Dict[SourceType, List[Source]] | ||||
|     _sources_by_type: Dict[SourceType, List[Source]] | ||||
|  | ||||
|     def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): | ||||
|         self._page_to_source_list = defaultdict(list) | ||||
| @@ -111,15 +139,54 @@ class SourceCollection: | ||||
|  | ||||
|         self.extend(data or []) | ||||
|  | ||||
|     def has_source_page(self, *source_pages: SourceType) -> bool: | ||||
|         return any(source_page in self._page_to_source_list for source_page in source_pages) | ||||
|     def source_types( | ||||
|         self,  | ||||
|         only_with_page: bool = False,  | ||||
|         sort_key = lambda page: page.name,  | ||||
|         reverse: bool = False | ||||
|     ) -> Iterable[SourceType]: | ||||
|         """ | ||||
|         Returns a list of all source types contained in this source collection. | ||||
|  | ||||
|     def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: | ||||
|         if not len(source_pages): | ||||
|             source_pages = self.source_pages | ||||
|         Args: | ||||
|             only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them. | ||||
|             sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name. | ||||
|             reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False. | ||||
|  | ||||
|         for page in source_pages: | ||||
|             yield from self._page_to_source_list[page] | ||||
|         Returns: | ||||
|             Iterable[SourceType]: A list of source types. | ||||
|         """ | ||||
|  | ||||
|         source_types: List[SourceType] = self._page_to_source_list.keys() | ||||
|         if only_with_page: | ||||
|             source_types = filter(lambda st: st.has_page, source_types) | ||||
|  | ||||
|         return sorted( | ||||
|             source_types,  | ||||
|             key=sort_key,  | ||||
|             reverse=reverse | ||||
|         ) | ||||
|  | ||||
|     def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]: | ||||
|             """ | ||||
|             Retrieves sources based on the provided source types and source type sorting. | ||||
|  | ||||
|             Args: | ||||
|                 *source_types (List[Source]): Variable number of source types to filter the sources. | ||||
|                 source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided. | ||||
|  | ||||
|             Yields: | ||||
|                 Generator[Source]: A generator that yields the sources based on the provided filters. | ||||
|  | ||||
|             Returns: | ||||
|                 None | ||||
|             """ | ||||
|             if not len(source_types): | ||||
|                 source_type_sorting = source_type_sorting or {} | ||||
|                 source_types = self.source_types(**source_type_sorting) | ||||
|  | ||||
|             for source_type in source_types: | ||||
|                 yield from self._page_to_source_list[source_type] | ||||
|  | ||||
|     def append(self, source: Source): | ||||
|         if source is None: | ||||
| @@ -156,10 +223,6 @@ class SourceCollection: | ||||
|     def __merge__(self, other: SourceCollection, **kwargs): | ||||
|         self.extend(other) | ||||
|          | ||||
|     @property | ||||
|     def source_pages(self) -> Iterable[SourceType]: | ||||
|         return sorted(self._page_to_source_list.keys(), key=lambda page: page.value) | ||||
|  | ||||
|     @property | ||||
|     def hash_url_list(self) -> List[str]: | ||||
|         return [hash_url(source.url) for source in self.get_sources()] | ||||
| @@ -170,7 +233,7 @@ class SourceCollection: | ||||
|  | ||||
|     @property | ||||
|     def homepage_list(self) -> List[str]: | ||||
|         return [source.homepage for source in self.source_pages] | ||||
|         return [source_type.homepage for source_type in self._sources_by_type.keys()] | ||||
|  | ||||
|     def indexing_values(self) -> Generator[Tuple[str, str], None, None]: | ||||
|         for index in self._indexed_sources: | ||||
|   | ||||
| @@ -246,8 +246,6 @@ class Page: | ||||
|             else: | ||||
|                 output(f'- {target.file_path}', color=BColors.GREY) | ||||
|  | ||||
|         if not song.source_collection.has_source_page(self.SOURCE_TYPE): | ||||
|             return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.") | ||||
|  | ||||
|         sources = song.source_collection.get_sources(self.SOURCE_TYPE) | ||||
|  | ||||
| @@ -264,15 +262,15 @@ class Page: | ||||
|             r.merge(self._post_process_targets( | ||||
|                 song=song,  | ||||
|                 temp_target=temp_target, | ||||
|                 interval_list=skip_intervals, | ||||
|                 skip_intervals=skip_intervals, | ||||
|                 found_on_disc=found_on_disc, | ||||
|             )) | ||||
|  | ||||
|         return r | ||||
|  | ||||
|     def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult: | ||||
|     def _post_process_targets(self, song: Song, temp_target: Target, skip_intervals: List, found_on_disc: bool) -> DownloadResult: | ||||
|         if not found_on_disc or self.download_options.process_audio_if_found: | ||||
|             correct_codec(temp_target, interval_list=interval_list) | ||||
|             correct_codec(temp_target, skip_intervals=skip_intervals) | ||||
|  | ||||
|         self.post_process_hook(song, temp_target) | ||||
|  | ||||
| @@ -288,7 +286,7 @@ class Page: | ||||
|             r.add_target(target) | ||||
|  | ||||
|         temp_target.delete() | ||||
|         r.sponsor_segments += len(interval_list) | ||||
|         r.sponsor_segments += len(skip_intervals) | ||||
|  | ||||
|         return r | ||||
|  | ||||
|   | ||||
| @@ -51,7 +51,6 @@ class BandcampTypes(Enum): | ||||
|  | ||||
| class Bandcamp(Page): | ||||
|     SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP | ||||
|     LOGGER = logging_settings["bandcamp_logger"] | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         self.connection: Connection = Connection( | ||||
|   | ||||
| @@ -41,8 +41,6 @@ class YouTube(SuperYouTube): | ||||
|     # CHANGE | ||||
|     SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE | ||||
|  | ||||
|     NO_ADDITIONAL_DATA_FROM_SONG = False | ||||
|  | ||||
|     def __init__(self, *args, **kwargs): | ||||
|         self.connection: Connection = Connection( | ||||
|             host=get_invidious_url(), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user