diff --git a/music_kraken/objects/source.py b/music_kraken/objects/source.py index 64cd433..c122d11 100644 --- a/music_kraken/objects/source.py +++ b/music_kraken/objects/source.py @@ -2,10 +2,12 @@ from __future__ import annotations from collections import defaultdict from enum import Enum -from typing import List, Dict, Set, Tuple, Optional, Iterable -from urllib.parse import urlparse -from dataclasses import dataclass +from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator +from urllib.parse import urlparse, ParseResult +from dataclasses import dataclass, field +from functools import cached_property +from ..utils import generate_id from ..utils.enums.source import SourcePages, SourceTypes from ..utils.config import youtube_settings from ..utils.string_processing import hash_url @@ -17,25 +19,21 @@ from .collection import Collection @dataclass -class Source(OuterProxy): +class Source: url: str page_enum: SourcePages referrer_page: SourcePages + audio_url: Optional[str] - audio_url: str + id: int = field(default_factory=generate_id) + additional_data: dict = field(default_factory=dict) - _default_factories = { - "audio_url": lambda: None, - } - - # This is automatically generated - def __init__(self, page_enum: SourcePages, url: str, referrer_page: SourcePages = None, audio_url: str = None, - **kwargs) -> None: - - if referrer_page is None: - referrer_page = page_enum - - super().__init__(url=url, page_enum=page_enum, referrer_page=referrer_page, audio_url=audio_url, **kwargs) + def __post_init__(self): + self.referrer_page = self.referrer_page or self.page_enum + + @cached_property + def parsed_url(self) -> ParseResult: + return urlparse(self.url) @classmethod def match_url(cls, url: str, referrer_page: SourcePages) -> Optional["Source"]: @@ -122,16 +120,23 @@ class Source(OuterProxy): homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum)) -class SourceCollection(Collection): +class SourceCollection: + _page_to_source_list: Dict[SourcePages, List[Source]] + + def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs): - self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list) + self._page_to_source_list = defaultdict(list) - super().__init__(data=data, **kwargs) + def get_sources(self, *source_pages: List[Source]) -> Generator[Source]: + for page in source_pages: + yield from self._page_to_source_list[page] - def _map_element(self, __object: Source, **kwargs): - super()._map_element(__object, **kwargs) + def append(self, source: Source): + pass - self._page_to_source_list[__object.page_enum].append(__object) + def extend(self, sources: Iterable[Source]): + for source in sources: + self.append(source) @property def source_pages(self) -> Set[SourcePages]: diff --git a/music_kraken/utils/string_processing.py b/music_kraken/utils/string_processing.py index 9acd3c8..0b45c6f 100644 --- a/music_kraken/utils/string_processing.py +++ b/music_kraken/utils/string_processing.py @@ -6,6 +6,7 @@ from functools import lru_cache from transliterate.exceptions import LanguageDetectionError from transliterate import translit from pathvalidate import sanitize_filename +from urllib.parse import urlparse, ParseResult, parse_qs COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = ( @@ -21,6 +22,7 @@ def unify(string: str) -> str: returns a unified str, to make comparisons easy. a unified string has the following attributes: - is lowercase + - is transliterated to Latin characters from e.g. Cyrillic """ if string is None: @@ -132,8 +134,27 @@ def unify_punctuation(to_unify: str) -> str: to_unify = to_unify.replace(char, UNIFY_TO) return to_unify -def hash_url(url: str) -> int: - return url.strip().lower().lstrip("https://").lstrip("http://") +def hash_url(url: Union[str, ParseResult]) -> str: + if isinstance(url, str): + url = urlparse(url) + + query = url.query + query_dict: Optional[dict] = None + try: + query_dict: dict = parse_qs(url.query, strict_parsing=True) + except ValueError: + # the query couldn't be parsed + pass + + if isinstance(query_dict, dict): + # sort keys alphabetically + query = "" + for key, value in sorted(query_dict.items(), key=lambda i: i[0]): + query += f"_{key.strip()}_{''.join(i.strip() for i in value)}" + + r = f"{url.netloc}_{url.path.replace('/', '_')}{query}" + r = r.lower().strip() + return r def remove_feature_part_from_track(title: str) -> str: