Compare commits
3 Commits
da8887b279
...
ac6c513d56
Author | SHA1 | Date | |
---|---|---|---|
ac6c513d56 | |||
cc14253239 | |||
14f986a497 |
@ -10,12 +10,12 @@ from ..objects import Target
|
|||||||
LOGGER = logging_settings["codex_logger"]
|
LOGGER = logging_settings["codex_logger"]
|
||||||
|
|
||||||
|
|
||||||
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None):
|
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
|
||||||
if not target.exists:
|
if not target.exists:
|
||||||
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
|
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
|
||||||
return
|
return
|
||||||
|
|
||||||
interval_list = interval_list or []
|
skip_intervals = skip_intervals or []
|
||||||
|
|
||||||
bitrate_b = int(bitrate_kb / 1024)
|
bitrate_b = int(bitrate_kb / 1024)
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
|
|||||||
|
|
||||||
start = 0
|
start = 0
|
||||||
next_start = 0
|
next_start = 0
|
||||||
for end, next_start in interval_list:
|
for end, next_start in skip_intervals:
|
||||||
aselect_list.append(f"between(t,{start},{end})")
|
aselect_list.append(f"between(t,{start},{end})")
|
||||||
start = next_start
|
start = next_start
|
||||||
aselect_list.append(f"gte(t,{next_start})")
|
aselect_list.append(f"gte(t,{next_start})")
|
||||||
|
@ -178,8 +178,6 @@ class Downloader:
|
|||||||
page_count = 0
|
page_count = 0
|
||||||
for option in self.current_results.formatted_generator():
|
for option in self.current_results.formatted_generator():
|
||||||
if isinstance(option, Option):
|
if isinstance(option, Option):
|
||||||
_downloadable = self.pages.is_downloadable(option.music_object)
|
|
||||||
|
|
||||||
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
|
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
|
||||||
print(r)
|
print(r)
|
||||||
else:
|
else:
|
||||||
|
@ -16,6 +16,7 @@ from ..objects import (
|
|||||||
Artist,
|
Artist,
|
||||||
Label,
|
Label,
|
||||||
)
|
)
|
||||||
|
from ..audio import write_metadata_to_target, correct_codec
|
||||||
from ..utils.string_processing import fit_to_file_system
|
from ..utils.string_processing import fit_to_file_system
|
||||||
from ..utils.config import youtube_settings, main_settings
|
from ..utils.config import youtube_settings, main_settings
|
||||||
from ..utils.path_manager import LOCATIONS
|
from ..utils.path_manager import LOCATIONS
|
||||||
@ -125,15 +126,10 @@ class Pages:
|
|||||||
return data_object
|
return data_object
|
||||||
|
|
||||||
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
|
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
|
||||||
page: Page = self._get_page_from_enum(source.source_type)
|
if not source.has_page:
|
||||||
if page is None:
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# getting the appropriate function for the page and the object type
|
func = getattr(source.page, fetch_map[source_type])(source=source, **kwargs)
|
||||||
source_type = page.get_source_type(source)
|
|
||||||
if not hasattr(page, fetch_map[source_type]):
|
|
||||||
return None
|
|
||||||
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
|
|
||||||
|
|
||||||
# fetching the data object and marking it as fetched
|
# fetching the data object and marking it as fetched
|
||||||
data_object: DataObject = func(source=source)
|
data_object: DataObject = func(source=source)
|
||||||
@ -147,15 +143,6 @@ class Pages:
|
|||||||
|
|
||||||
return self.fetch_from_source(source=source)
|
return self.fetch_from_source(source=source)
|
||||||
|
|
||||||
def is_downloadable(self, music_object: DataObject) -> bool:
|
|
||||||
_page_types = set(self._source_to_page)
|
|
||||||
for src in music_object.source_collection.source_pages:
|
|
||||||
if src in self._source_to_page:
|
|
||||||
_page_types.add(self._source_to_page[src])
|
|
||||||
|
|
||||||
audio_pages = self._audio_pages_set.intersection(_page_types)
|
|
||||||
return len(audio_pages) > 0
|
|
||||||
|
|
||||||
def _skip_object(self, data_object: DataObject) -> bool:
|
def _skip_object(self, data_object: DataObject) -> bool:
|
||||||
if isinstance(data_object, Album):
|
if isinstance(data_object, Album):
|
||||||
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
|
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
|
||||||
@ -224,11 +211,6 @@ class Pages:
|
|||||||
|
|
||||||
return possible_parts
|
return possible_parts
|
||||||
|
|
||||||
def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]:
|
|
||||||
pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()]
|
|
||||||
pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True)
|
|
||||||
return list(pages)
|
|
||||||
|
|
||||||
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
|
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
|
||||||
"""
|
"""
|
||||||
TODO
|
TODO
|
||||||
@ -257,7 +239,6 @@ class Pages:
|
|||||||
|
|
||||||
# manage the targets
|
# manage the targets
|
||||||
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
|
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
|
||||||
found_on_disc = False
|
|
||||||
|
|
||||||
song.target_collection.append(Target(
|
song.target_collection.append(Target(
|
||||||
relative_to_music_dir=True,
|
relative_to_music_dir=True,
|
||||||
@ -269,18 +250,54 @@ class Pages:
|
|||||||
for target in song.target_collection:
|
for target in song.target_collection:
|
||||||
if target.exists():
|
if target.exists():
|
||||||
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
|
||||||
|
|
||||||
found_on_disc = True
|
|
||||||
r.found_on_disk += 1
|
r.found_on_disk += 1
|
||||||
target.copy_content(tmp)
|
|
||||||
|
if self.download_options.download_again_if_found:
|
||||||
|
target.copy_content(tmp)
|
||||||
else:
|
else:
|
||||||
target.create_parent_directories()
|
target.create_parent_directories()
|
||||||
output(f'- {target.file_path}', color=BColors.GREY)
|
output(f'- {target.file_path}', color=BColors.GREY)
|
||||||
|
|
||||||
# actually download
|
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
|
||||||
for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"):
|
used_source: Optional[Source] = None
|
||||||
r = page.download_song_to_target(song, tmp, r)
|
skip_intervals: List[Tuple[float, float]] = []
|
||||||
|
for source in song.source_collection.get_sources(source_type_sorting={
|
||||||
|
"only_with_page": True,
|
||||||
|
"sort_key": lambda page: page.download_priority,
|
||||||
|
"reverse": True,
|
||||||
|
}):
|
||||||
|
if tmp.exists:
|
||||||
|
break
|
||||||
|
|
||||||
|
used_source = source
|
||||||
|
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
|
||||||
|
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
|
||||||
|
|
||||||
|
# if something has been downloaded but it somehow failed, delete the file
|
||||||
|
if streaming_results.is_fatal_error and tmp.exists:
|
||||||
|
tmp.delete()
|
||||||
|
|
||||||
|
# if everything went right, the file should exist now
|
||||||
|
if not tmp.exists:
|
||||||
|
if used_source is None:
|
||||||
|
r.error_message = f"No source found for {song.option_string}."
|
||||||
|
else:
|
||||||
|
r.error_message = f"Something went wrong downloading {song.option_string}."
|
||||||
|
return r
|
||||||
|
|
||||||
|
# post process the audio
|
||||||
|
found_on_disk = used_source is None
|
||||||
|
if not found_on_disk or self.download_options.process_audio_if_found:
|
||||||
|
correct_codec(target=tmp, skip_intervals=skip_intervals)
|
||||||
|
r.sponsor_segments = len(skip_intervals)
|
||||||
|
|
||||||
|
if used_source is not None:
|
||||||
|
used_source.page.post_process_hook(song=song, temp_target=tmp)
|
||||||
|
|
||||||
|
if not found_on_disc or self.download_options.process_metadata_if_found:
|
||||||
|
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
|
||||||
|
|
||||||
|
tmp.delete()
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
|
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:
|
||||||
|
@ -2,7 +2,19 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
|
from typing import (
|
||||||
|
List,
|
||||||
|
Dict,
|
||||||
|
Set,
|
||||||
|
Tuple,
|
||||||
|
Optional,
|
||||||
|
Iterable,
|
||||||
|
Generator,
|
||||||
|
TypedDict,
|
||||||
|
Callable,
|
||||||
|
Any,
|
||||||
|
TYPE_CHECKING
|
||||||
|
)
|
||||||
from urllib.parse import urlparse, ParseResult
|
from urllib.parse import urlparse, ParseResult
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
@ -15,6 +27,8 @@ from ..utils.string_processing import hash_url, shorten_display_url
|
|||||||
from .metadata import Mapping, Metadata
|
from .metadata import Mapping, Metadata
|
||||||
from .parents import OuterProxy
|
from .parents import OuterProxy
|
||||||
from .collection import Collection
|
from .collection import Collection
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..pages.abstract import Page
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -30,10 +44,6 @@ class Source:
|
|||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
self.referrer_page = self.referrer_page or self.source_type
|
self.referrer_page = self.referrer_page or self.source_type
|
||||||
|
|
||||||
@property
|
|
||||||
def parsed_url(self) -> ParseResult:
|
|
||||||
return urlparse(self.url)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
|
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
|
||||||
"""
|
"""
|
||||||
@ -77,6 +87,18 @@ class Source:
|
|||||||
if url.startswith("https://myspace.com"):
|
if url.startswith("https://myspace.com"):
|
||||||
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
|
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_page(self) -> bool:
|
||||||
|
return self.source_type.page is not None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def page(self) -> Page:
|
||||||
|
return self.source_type.page
|
||||||
|
|
||||||
|
@property
|
||||||
|
def parsed_url(self) -> ParseResult:
|
||||||
|
return urlparse(self.url)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hash_url(self) -> str:
|
def hash_url(self) -> str:
|
||||||
return hash_url(self.url)
|
return hash_url(self.url)
|
||||||
@ -99,11 +121,17 @@ class Source:
|
|||||||
page_str = property(fget=lambda self: self.source_type.value)
|
page_str = property(fget=lambda self: self.source_type.value)
|
||||||
|
|
||||||
|
|
||||||
|
class SourceTypeSorting(TypedDict):
|
||||||
|
sort_key: Callable[[SourceType], Any]
|
||||||
|
reverse: bool
|
||||||
|
only_with_page: bool
|
||||||
|
|
||||||
|
|
||||||
class SourceCollection:
|
class SourceCollection:
|
||||||
__change_version__ = generate_id()
|
__change_version__ = generate_id()
|
||||||
|
|
||||||
_indexed_sources: Dict[str, Source]
|
_indexed_sources: Dict[str, Source]
|
||||||
_page_to_source_list: Dict[SourceType, List[Source]]
|
_sources_by_type: Dict[SourceType, List[Source]]
|
||||||
|
|
||||||
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
|
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
|
||||||
self._page_to_source_list = defaultdict(list)
|
self._page_to_source_list = defaultdict(list)
|
||||||
@ -111,15 +139,54 @@ class SourceCollection:
|
|||||||
|
|
||||||
self.extend(data or [])
|
self.extend(data or [])
|
||||||
|
|
||||||
def has_source_page(self, *source_pages: SourceType) -> bool:
|
def source_types(
|
||||||
return any(source_page in self._page_to_source_list for source_page in source_pages)
|
self,
|
||||||
|
only_with_page: bool = False,
|
||||||
|
sort_key = lambda page: page.name,
|
||||||
|
reverse: bool = False
|
||||||
|
) -> Iterable[SourceType]:
|
||||||
|
"""
|
||||||
|
Returns a list of all source types contained in this source collection.
|
||||||
|
|
||||||
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
|
Args:
|
||||||
if not len(source_pages):
|
only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
|
||||||
source_pages = self.source_pages
|
sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
|
||||||
|
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
|
||||||
|
|
||||||
for page in source_pages:
|
Returns:
|
||||||
yield from self._page_to_source_list[page]
|
Iterable[SourceType]: A list of source types.
|
||||||
|
"""
|
||||||
|
|
||||||
|
source_types: List[SourceType] = self._page_to_source_list.keys()
|
||||||
|
if only_with_page:
|
||||||
|
source_types = filter(lambda st: st.has_page, source_types)
|
||||||
|
|
||||||
|
return sorted(
|
||||||
|
source_types,
|
||||||
|
key=sort_key,
|
||||||
|
reverse=reverse
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
|
||||||
|
"""
|
||||||
|
Retrieves sources based on the provided source types and source type sorting.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
*source_types (List[Source]): Variable number of source types to filter the sources.
|
||||||
|
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
Generator[Source]: A generator that yields the sources based on the provided filters.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
None
|
||||||
|
"""
|
||||||
|
if not len(source_types):
|
||||||
|
source_type_sorting = source_type_sorting or {}
|
||||||
|
source_types = self.source_types(**source_type_sorting)
|
||||||
|
|
||||||
|
for source_type in source_types:
|
||||||
|
yield from self._page_to_source_list[source_type]
|
||||||
|
|
||||||
def append(self, source: Source):
|
def append(self, source: Source):
|
||||||
if source is None:
|
if source is None:
|
||||||
@ -156,10 +223,6 @@ class SourceCollection:
|
|||||||
def __merge__(self, other: SourceCollection, **kwargs):
|
def __merge__(self, other: SourceCollection, **kwargs):
|
||||||
self.extend(other)
|
self.extend(other)
|
||||||
|
|
||||||
@property
|
|
||||||
def source_pages(self) -> Iterable[SourceType]:
|
|
||||||
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def hash_url_list(self) -> List[str]:
|
def hash_url_list(self) -> List[str]:
|
||||||
return [hash_url(source.url) for source in self.get_sources()]
|
return [hash_url(source.url) for source in self.get_sources()]
|
||||||
@ -170,7 +233,7 @@ class SourceCollection:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def homepage_list(self) -> List[str]:
|
def homepage_list(self) -> List[str]:
|
||||||
return [source.homepage for source in self.source_pages]
|
return [source_type.homepage for source_type in self._sources_by_type.keys()]
|
||||||
|
|
||||||
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
|
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
|
||||||
for index in self._indexed_sources:
|
for index in self._indexed_sources:
|
||||||
|
@ -246,8 +246,6 @@ class Page:
|
|||||||
else:
|
else:
|
||||||
output(f'- {target.file_path}', color=BColors.GREY)
|
output(f'- {target.file_path}', color=BColors.GREY)
|
||||||
|
|
||||||
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
|
|
||||||
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
|
|
||||||
|
|
||||||
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
|
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
|
||||||
|
|
||||||
@ -264,15 +262,15 @@ class Page:
|
|||||||
r.merge(self._post_process_targets(
|
r.merge(self._post_process_targets(
|
||||||
song=song,
|
song=song,
|
||||||
temp_target=temp_target,
|
temp_target=temp_target,
|
||||||
interval_list=skip_intervals,
|
skip_intervals=skip_intervals,
|
||||||
found_on_disc=found_on_disc,
|
found_on_disc=found_on_disc,
|
||||||
))
|
))
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
|
def _post_process_targets(self, song: Song, temp_target: Target, skip_intervals: List, found_on_disc: bool) -> DownloadResult:
|
||||||
if not found_on_disc or self.download_options.process_audio_if_found:
|
if not found_on_disc or self.download_options.process_audio_if_found:
|
||||||
correct_codec(temp_target, interval_list=interval_list)
|
correct_codec(temp_target, skip_intervals=skip_intervals)
|
||||||
|
|
||||||
self.post_process_hook(song, temp_target)
|
self.post_process_hook(song, temp_target)
|
||||||
|
|
||||||
@ -288,7 +286,7 @@ class Page:
|
|||||||
r.add_target(target)
|
r.add_target(target)
|
||||||
|
|
||||||
temp_target.delete()
|
temp_target.delete()
|
||||||
r.sponsor_segments += len(interval_list)
|
r.sponsor_segments += len(skip_intervals)
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
@ -51,7 +51,6 @@ class BandcampTypes(Enum):
|
|||||||
|
|
||||||
class Bandcamp(Page):
|
class Bandcamp(Page):
|
||||||
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
|
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
|
||||||
LOGGER = logging_settings["bandcamp_logger"]
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.connection: Connection = Connection(
|
self.connection: Connection = Connection(
|
||||||
|
@ -41,8 +41,6 @@ class YouTube(SuperYouTube):
|
|||||||
# CHANGE
|
# CHANGE
|
||||||
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
|
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
|
||||||
|
|
||||||
NO_ADDITIONAL_DATA_FROM_SONG = False
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.connection: Connection = Connection(
|
self.connection: Connection = Connection(
|
||||||
host=get_invidious_url(),
|
host=get_invidious_url(),
|
||||||
|
Loading…
Reference in New Issue
Block a user