music-kraken-core/music_kraken/objects/source.py
Lars Noack da8887b279
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
draft: rewriting soure
2024-05-14 15:18:17 +02:00

177 lines
5.8 KiB
Python

from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
from ..utils import generate_id
from ..utils.enums import SourceType
from ..utils.config import youtube_settings
from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
from .parents import OuterProxy
from .collection import Collection
@dataclass
class Source:
source_type: SourceType
url: str
referrer_page: SourceType = None
audio_url: Optional[str] = None
additional_data: dict = field(default_factory=dict)
def __post_init__(self):
self.referrer_page = self.referrer_page or self.source_type
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
"""
this shouldn't be used, unless you are not certain what the source is for
the reason is that it is more inefficient
"""
parsed_url = urlparse(url)
url = parsed_url.geturl()
if "musify" in parsed_url.netloc:
return cls(SourceType.MUSIFY, url, referrer_page=referrer_page)
if parsed_url.netloc in [_url.netloc for _url in youtube_settings['youtube_url']]:
return cls(SourceType.YOUTUBE, url, referrer_page=referrer_page)
if url.startswith("https://www.deezer"):
return cls(SourceType.DEEZER, url, referrer_page=referrer_page)
if url.startswith("https://open.spotify.com"):
return cls(SourceType.SPOTIFY, url, referrer_page=referrer_page)
if "bandcamp" in url:
return cls(SourceType.BANDCAMP, url, referrer_page=referrer_page)
if "wikipedia" in parsed_url.netloc:
return cls(SourceType.WIKIPEDIA, url, referrer_page=referrer_page)
if url.startswith("https://www.metal-archives.com/"):
return cls(SourceType.ENCYCLOPAEDIA_METALLUM, url, referrer_page=referrer_page)
# the less important once
if url.startswith("https://www.facebook"):
return cls(SourceType.FACEBOOK, url, referrer_page=referrer_page)
if url.startswith("https://www.instagram"):
return cls(SourceType.INSTAGRAM, url, referrer_page=referrer_page)
if url.startswith("https://twitter"):
return cls(SourceType.TWITTER, url, referrer_page=referrer_page)
if url.startswith("https://myspace.com"):
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
@property
def hash_url(self) -> str:
return hash_url(self.url)
@property
def indexing_values(self) -> list:
r = [hash_url(self.url)]
if self.audio_url:
r.append(hash_url(self.audio_url))
return r
def __repr__(self) -> str:
return f"Src({self.source_type.value}: {shorten_display_url(self.url)})"
def __merge__(self, other: Source, **kwargs):
if self.audio_url is None:
self.audio_url = other.audio_url
self.additional_data.update(other.additional_data)
page_str = property(fget=lambda self: self.source_type.value)
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list)
self._indexed_sources = {}
self.extend(data or [])
def has_source_page(self, *source_pages: SourceType) -> bool:
return any(source_page in self._page_to_source_list for source_page in source_pages)
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
if not len(source_pages):
source_pages = self.source_pages
for page in source_pages:
yield from self._page_to_source_list[page]
def append(self, source: Source):
if source is None:
return
existing_source = None
for key in source.indexing_values:
if key in self._indexed_sources:
existing_source = self._indexed_sources[key]
break
if existing_source is not None:
existing_source.__merge__(source)
source = existing_source
else:
self._page_to_source_list[source.source_type].append(source)
changed = False
for key in source.indexing_values:
if key not in self._indexed_sources:
changed = True
self._indexed_sources[key] = source
if changed:
self.__change_version__ = generate_id()
def extend(self, sources: Iterable[Source]):
for source in sources:
self.append(source)
def __iter__(self):
yield from self.get_sources()
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def source_pages(self) -> Iterable[SourceType]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@property
def url_list(self) -> List[str]:
return [source.url for source in self.get_sources()]
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:
yield "url", index