draft: string processing
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
This commit is contained in:
parent
aa50d2cf20
commit
c6bdf724e3
@ -2,10 +2,12 @@ from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
from typing import List, Dict, Set, Tuple, Optional, Iterable
|
||||
from urllib.parse import urlparse
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
|
||||
from urllib.parse import urlparse, ParseResult
|
||||
from dataclasses import dataclass, field
|
||||
from functools import cached_property
|
||||
|
||||
from ..utils import generate_id
|
||||
from ..utils.enums.source import SourcePages, SourceTypes
|
||||
from ..utils.config import youtube_settings
|
||||
from ..utils.string_processing import hash_url
|
||||
@ -17,25 +19,21 @@ from .collection import Collection
|
||||
|
||||
|
||||
@dataclass
|
||||
class Source(OuterProxy):
|
||||
class Source:
|
||||
url: str
|
||||
page_enum: SourcePages
|
||||
referrer_page: SourcePages
|
||||
audio_url: Optional[str]
|
||||
|
||||
audio_url: str
|
||||
id: int = field(default_factory=generate_id)
|
||||
additional_data: dict = field(default_factory=dict)
|
||||
|
||||
_default_factories = {
|
||||
"audio_url": lambda: None,
|
||||
}
|
||||
|
||||
# This is automatically generated
|
||||
def __init__(self, page_enum: SourcePages, url: str, referrer_page: SourcePages = None, audio_url: str = None,
|
||||
**kwargs) -> None:
|
||||
|
||||
if referrer_page is None:
|
||||
referrer_page = page_enum
|
||||
|
||||
super().__init__(url=url, page_enum=page_enum, referrer_page=referrer_page, audio_url=audio_url, **kwargs)
|
||||
def __post_init__(self):
|
||||
self.referrer_page = self.referrer_page or self.page_enum
|
||||
|
||||
@cached_property
|
||||
def parsed_url(self) -> ParseResult:
|
||||
return urlparse(self.url)
|
||||
|
||||
@classmethod
|
||||
def match_url(cls, url: str, referrer_page: SourcePages) -> Optional["Source"]:
|
||||
@ -122,16 +120,23 @@ class Source(OuterProxy):
|
||||
homepage = property(fget=lambda self: SourcePages.get_homepage(self.page_enum))
|
||||
|
||||
|
||||
class SourceCollection(Collection):
|
||||
class SourceCollection:
|
||||
_page_to_source_list: Dict[SourcePages, List[Source]]
|
||||
|
||||
|
||||
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
|
||||
self._page_to_source_list: Dict[SourcePages, List[Source]] = defaultdict(list)
|
||||
self._page_to_source_list = defaultdict(list)
|
||||
|
||||
super().__init__(data=data, **kwargs)
|
||||
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
|
||||
for page in source_pages:
|
||||
yield from self._page_to_source_list[page]
|
||||
|
||||
def _map_element(self, __object: Source, **kwargs):
|
||||
super()._map_element(__object, **kwargs)
|
||||
def append(self, source: Source):
|
||||
pass
|
||||
|
||||
self._page_to_source_list[__object.page_enum].append(__object)
|
||||
def extend(self, sources: Iterable[Source]):
|
||||
for source in sources:
|
||||
self.append(source)
|
||||
|
||||
@property
|
||||
def source_pages(self) -> Set[SourcePages]:
|
||||
|
@ -6,6 +6,7 @@ from functools import lru_cache
|
||||
from transliterate.exceptions import LanguageDetectionError
|
||||
from transliterate import translit
|
||||
from pathvalidate import sanitize_filename
|
||||
from urllib.parse import urlparse, ParseResult, parse_qs
|
||||
|
||||
|
||||
COMMON_TITLE_APPENDIX_LIST: Tuple[str, ...] = (
|
||||
@ -21,6 +22,7 @@ def unify(string: str) -> str:
|
||||
returns a unified str, to make comparisons easy.
|
||||
a unified string has the following attributes:
|
||||
- is lowercase
|
||||
- is transliterated to Latin characters from e.g. Cyrillic
|
||||
"""
|
||||
|
||||
if string is None:
|
||||
@ -132,8 +134,27 @@ def unify_punctuation(to_unify: str) -> str:
|
||||
to_unify = to_unify.replace(char, UNIFY_TO)
|
||||
return to_unify
|
||||
|
||||
def hash_url(url: str) -> int:
|
||||
return url.strip().lower().lstrip("https://").lstrip("http://")
|
||||
def hash_url(url: Union[str, ParseResult]) -> str:
|
||||
if isinstance(url, str):
|
||||
url = urlparse(url)
|
||||
|
||||
query = url.query
|
||||
query_dict: Optional[dict] = None
|
||||
try:
|
||||
query_dict: dict = parse_qs(url.query, strict_parsing=True)
|
||||
except ValueError:
|
||||
# the query couldn't be parsed
|
||||
pass
|
||||
|
||||
if isinstance(query_dict, dict):
|
||||
# sort keys alphabetically
|
||||
query = ""
|
||||
for key, value in sorted(query_dict.items(), key=lambda i: i[0]):
|
||||
query += f"_{key.strip()}_{''.join(i.strip() for i in value)}"
|
||||
|
||||
r = f"{url.netloc}_{url.path.replace('/', '_')}{query}"
|
||||
r = r.lower().strip()
|
||||
return r
|
||||
|
||||
|
||||
def remove_feature_part_from_track(title: str) -> str:
|
||||
|
Loading…
Reference in New Issue
Block a user