music-kraken-core/music_kraken/objects/source.py
Lars Noack bedd0fe819
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
fix: runtime errors
2024-05-15 13:16:11 +02:00

238 lines
7.6 KiB
Python

from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums import SourceType, ALL_SOURCE_TYPES
from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
if TYPE_CHECKING:
from ..pages.abstract import Page
@dataclass
class Source:
source_type: SourceType
url: str
referrer_page: SourceType = None
audio_url: Optional[str] = None
additional_data: dict = field(default_factory=dict)
def __post_init__(self):
self.referrer_page = self.referrer_page or self.source_type
@classmethod
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
"""
this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient
"""
parsed_url = urlparse(url)
url = parsed_url.geturl()
if "musify" in parsed_url.netloc:
return cls(ALL_SOURCE_TYPES.MUSIFY, url, referrer_page=referrer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(ALL_SOURCE_TYPES.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"):
return cls(ALL_SOURCE_TYPES.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"):
return cls(ALL_SOURCE_TYPES.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url:
return cls(ALL_SOURCE_TYPES.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed_url.netloc:
return cls(ALL_SOURCE_TYPES.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"):
return cls(ALL_SOURCE_TYPES.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once
if url.startswith("https://www.facebook"):
return cls(ALL_SOURCE_TYPES.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"):
return cls(ALL_SOURCE_TYPES.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"):
return cls(ALL_SOURCE_TYPES.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"):
return cls(ALL_SOURCE_TYPES.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property
def hash_url(self) -> str:
return hash_url(self.url)
@property
def indexing_values(self) -> list:
r = [hash_url(self.url)]
if self.audio_url:
r.append(hash_url(self.audio_url))
return r
def __repr__(self) -> str:
return f"Src({self.source_type.value}: {shorten_display_url(self.url)})"
def __merge__(self, other: Source, **kwargs):
if self.audio_url is None:
self.audio_url = other.audio_url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._sources_by_type = defaultdict(list)
self._indexed_sources = {}
self.extend(data or [])
def source_types(
self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
Args:
only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
Returns:
Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._sources_by_type.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._sources_by_type[source_type]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:
existing_source = self._indexed_sources[key]
break
if existing_source is not None:
existing_source.__merge__(source)
source = existing_source
else:
self._sources_by_type[source.source_type].append(source)
changed = False
for key in source.indexing_values:
if key not in self._indexed_sources:
changed = True
self._indexed_sources[key] = source
if changed:
self.__change_version__ = generate_id()
def extend(self, sources: Iterable[Source]):
for source in sources:
self.append(source)
def __iter__(self):
yield from self.get_sources()
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@property
def url_list(self) -> List[str]:
return [source.url for source in self.get_sources()]
@property
def homepage_list(self) -> List[str]:
return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:
yield "url", index