draft:
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
2024-05-23 17:27:24 +02:00
parent cd2e7d7173
commit 906ddb679d
10 changed files with 76 additions and 596 deletions

View File

@@ -1,255 +0,0 @@
import logging
import re
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Type
from ..audio import correct_codec, write_metadata_to_target
from ..objects import Album, Artist, Collection
from ..objects import DatabaseObject as DataObject
from ..objects import Label, Options, Song, Source, Target
from ..pages import get_pages, scan_for_pages
from ..utils import BColors, output
from ..utils.config import main_settings, youtube_settings
from ..utils.enums import ALL_SOURCE_TYPES, SourceType
from ..utils.exception import MKMissingNameException
from ..utils.exception.download import UrlNotFoundException
from ..utils.path_manager import LOCATIONS
from ..utils.shared import DEBUG_PAGES
from ..utils.string_processing import fit_to_file_system
from ..utils.support_classes.download_result import DownloadResult
from ..utils.support_classes.query import Query
from . import DownloadOptions, FetchOptions
from .results import SearchResults
fetch_map = {
Song: "fetch_song",
Album: "fetch_album",
Artist: "fetch_artist",
Label: "fetch_label",
}
class Pages:
def __init__(self, download_options: DownloadOptions = None, fetch_options: FetchOptions = None, **kwargs):
self.LOGGER = logging.getLogger("download")
self.download_options: DownloadOptions = download_options or DownloadOptions()
self.fetch_options: FetchOptions = fetch_options or FetchOptions()
scan_for_pages(download_options=self.download_options, fetch_options=self.fetch_options, **kwargs)
def search(self, query: Query) -> SearchResults:
result = SearchResults()
for page in get_pages():
result.add(
page=type(page),
search_result=page.search(query=query)
)
return result
def fetch_details(self, data_object: DataObject, stop_at_level: int = 1, **kwargs) -> DataObject:
source: Source
for source in data_object.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
}):
new_data_object = self.fetch_from_source(source=source, stop_at_level=stop_at_level)
if new_data_object is not None:
data_object.merge(new_data_object)
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
if not source.has_page:
return None
source_type = source.page.get_source_type(source=source)
if source_type is None:
self.LOGGER.debug(f"Could not determine source type for {source}.")
return None
func = getattr(source.page, fetch_map[source_type])
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source, **kwargs)
data_object.mark_as_fetched(source.hash_url)
return data_object
def fetch_from_url(self, url: str) -> Optional[DataObject]:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None:
return None
return self.fetch_from_source(source=source)
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
if not self.download_options.download_all and data_object.album_type in self.download_options.album_type_blacklist:
return True
return False
def download(self, data_object: DataObject, genre: str, **kwargs) -> DownloadResult:
# fetch the given object
self.fetch_details(data_object)
output(f"\nDownloading {data_object.option_string}...", color=BColors.BOLD)
# fetching all parent objects (e.g. if you only download a song)
if not kwargs.get("fetched_upwards", False):
to_fetch: List[DataObject] = [data_object]
while len(to_fetch) > 0:
new_to_fetch = []
for d in to_fetch:
if self._skip_object(d):
continue
self.fetch_details(d)
for c in d.get_parent_collections():
new_to_fetch.extend(c)
to_fetch = new_to_fetch
kwargs["fetched_upwards"] = True
# download all children
download_result: DownloadResult = DownloadResult()
for c in data_object.get_child_collections():
for d in c:
if self._skip_object(d):
continue
download_result.merge(self.download(d, genre, **kwargs))
# actually download if the object is a song
if isinstance(data_object, Song):
"""
TODO
add the traced artist and album to the naming.
I am able to do that, because duplicate values are removed later on.
"""
self._download_song(data_object, naming={
"genre": [genre],
"audio_format": [main_settings["audio_format"]],
})
return download_result
def _extract_fields_from_template(self, path_template: str) -> Set[str]:
return set(re.findall(r"{([^}]+)}", path_template))
def _parse_path_template(self, path_template: str, naming: Dict[str, List[str]]) -> str:
field_names: Set[str] = self._extract_fields_from_template(path_template)
for field in field_names:
if len(naming[field]) == 0:
raise MKMissingNameException(f"Missing field for {field}.")
path_template = path_template.replace(f"{{{field}}}", naming[field][0])
return path_template
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
Search the song in the file system.
"""
r = DownloadResult(total=1)
# pre process the data recursively
song.compile()
# manage the naming
naming: Dict[str, List[str]] = defaultdict(list, naming)
naming["song"].append(song.title_value)
naming["isrc"].append(song.isrc)
naming["album"].extend(a.title_value for a in song.album_collection)
naming["album_type"].extend(a.album_type.value for a in song.album_collection)
naming["artist"].extend(a.name for a in song.artist_collection)
naming["artist"].extend(a.name for a in song.feature_artist_collection)
for a in song.album_collection:
naming["label"].extend([l.title_value for l in a.label_collection])
# removing duplicates from the naming, and process the strings
for key, value in naming.items():
# https://stackoverflow.com/a/17016257
naming[key] = list(dict.fromkeys(value))
song.genre = naming["genre"][0]
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
song.target_collection.append(Target(
relative_to_music_dir=True,
file_path=Path(
self._parse_path_template(main_settings["download_path"], naming=naming),
self._parse_path_template(main_settings["download_file"], naming=naming),
)
))
for target in song.target_collection:
if target.exists:
output(f'{target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
r.found_on_disk += 1
if not self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_path()
output(f'{target.file_path}', color=BColors.GREY)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disk or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
# copy the tmp target to the final locations
for target in song.target_collection:
tmp.copy_content(target)
tmp.delete()
return r
def fetch_url(self, url: str, **kwargs) -> DataObject:
source = Source.match_url(url, ALL_SOURCE_TYPES.MANUAL)
if source is None or source.page is None:
raise UrlNotFoundException(url=url)
return source.page.fetch_object_from_source(source=source, **kwargs)

View File

@@ -1,8 +1,8 @@
from typing import Tuple, Type, Dict, List, Generator, Union
from dataclasses import dataclass
from typing import Dict, Generator, List, Tuple, Type, Union
from ..objects import DatabaseObject
from ..pages import Page, EncyclopaediaMetallum, Musify
from . import Page
@dataclass