Compare commits

...

2 Commits

Author SHA1 Message Date
e93f6d754c draft
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
2024-04-30 12:32:55 +02:00
796f609d86 fix: push to 2024-04-30 09:31:38 +02:00
7 changed files with 75 additions and 33 deletions

View File

@ -4,6 +4,7 @@ from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from .parents import OuterProxy from .parents import OuterProxy
from ..utils import object_trace from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy) T = TypeVar('T', bound=OuterProxy)
@ -80,11 +81,12 @@ class Collection(Generic[T]):
for e in self: for e in self:
self._map_element(e) self._map_element(e)
def _find_object(self, __object: T) -> Optional[T]: def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
if not no_push_to:
for c in self.push_to: for c in self.push_to:
found = c._find_object(__object) found, found_in = c._find_object(__object, no_push_to=True)
if found is not None: if found is not None:
return found, c return found, found_in
self._remap() self._remap()
@ -92,6 +94,8 @@ class Collection(Generic[T]):
if value in self._indexed_values[name]: if value in self._indexed_values[name]:
return self._indexed_values[name][value], self return self._indexed_values[name][value], self
return None, self
def append(self, __object: Optional[T], **kwargs): def append(self, __object: Optional[T], **kwargs):
""" """
If an object, that represents the same entity exists in a relevant collection, If an object, that represents the same entity exists in a relevant collection,
@ -102,10 +106,20 @@ class Collection(Generic[T]):
:return: :return:
""" """
if __object is None: if __object is None:
return return
existing_object, map_to = self._find_object(__object) existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
if map_to is self:
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
output(other, __object, contained, color=BColors.RED)
if other is None:
continue
__object.__merge__(other, no_push_to=False, **kwargs)
contained.remove(other)
if existing_object is None: if existing_object is None:
# append # append
@ -133,15 +147,15 @@ class Collection(Generic[T]):
b_data = b.data.copy() b_data = b.data.copy()
b_collection_for = b._collection_for.copy() b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b)) no_sync_collection.add(id(b))
kwargs["no_sync_collection"] = no_sync_collection # kwargs["no_sync_collection"] = no_sync_collection
del b del b
a.extend(b_data, **kwargs)
for synced_with, key in b_collection_for.items(): for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a) synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
else: else:
# merge only if the two objects are not the same # merge only if the two objects are not the same
if existing_object.id == __object.id: if existing_object.id == __object.id:
@ -150,6 +164,11 @@ class Collection(Generic[T]):
existing_object.merge(__object, **kwargs) existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object) map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
def contains(self, __object: T) -> bool: def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None return self._find_object(__object) is not None

View File

@ -144,6 +144,7 @@ class Song(Base):
} }
self.feature_artist_collection.push_to = [self.main_artist_collection] self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]): def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song: if object_type is Song:

View File

@ -56,14 +56,7 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
for song in song_list: for song in song_list:
song.album_collection.extend(album_list) song.album_collection.extend(album_list)
song.feature_artist_collection.extend(artist_list)
for artist in artist_list:
existing_artist = song.main_artist_collection._find_object(artist)
if existing_artist is None:
song.feature_artist_collection.append(artist)
else:
existing_artist.merge(artist)
if len(song_list) > 0: if len(song_list) > 0:
return song_list return song_list

View File

@ -498,7 +498,22 @@ class YoutubeMusic(SuperYouTube):
self.fetch_media_url(source=source, ydl_res=ydl_res) self.fetch_media_url(source=source, ydl_res=ydl_res)
artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic") artist_names = []
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
"""
elif "artist" in ydl_res:
artist_names = ydl_res.get("artist").split(", ")
"""
artist_list = [
Artist(
name=name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
) for name in artist_names]
album_list = [] album_list = []
if "album" in ydl_res: if "album" in ydl_res:
@ -507,19 +522,14 @@ class YoutubeMusic(SuperYouTube):
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"), date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
)) ))
artist_name = artist_names[0] if len(artist_names) > 0 else None
return Song( return Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)), title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"), note=ydl_res.get("descriptions"),
album_list=album_list, album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000, length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])), artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=[Artist( main_artist_list=artist_list,
name=artist_name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
)],
source_list=[Source( source_list=[Source(
SourcePages.YOUTUBE_MUSIC, SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}" f"https://music.youtube.com/watch?v={ydl_res.get('id')}"

View File

@ -3,24 +3,30 @@ from pathlib import Path
import json import json
import logging import logging
import inspect import inspect
from typing import List
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config from .config import config, read_config, write_config
from .enums.colors import BColors from .enums.colors import BColors
from .path_manager import LOCATIONS from .path_manager import LOCATIONS
from .hacking import merge_args
""" """
IO functions IO functions
""" """
def _apply_color(msg: str, color: BColors) -> str: def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str):
msg = str(msg)
if color is BColors.ENDC: if color is BColors.ENDC:
return msg return msg
return color.value + msg + BColors.ENDC.value return color.value + msg + BColors.ENDC.value
def output(msg: str, color: BColors = BColors.ENDC): @merge_args(print)
print(_apply_color(msg, color)) def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs):
print(*(_apply_color(s, color) for s in msg), **kwargs)
def user_input(msg: str, color: BColors = BColors.ENDC): def user_input(msg: str, color: BColors = BColors.ENDC):

View File

@ -78,7 +78,14 @@ def _merge(
drop_args = [] drop_args = []
if drop_kwonlyargs is None: if drop_kwonlyargs is None:
drop_kwonlyargs = [] drop_kwonlyargs = []
is_builtin = False
try:
source_spec = inspect.getfullargspec(source) source_spec = inspect.getfullargspec(source)
except TypeError:
is_builtin = True
source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], [])
dest_spec = inspect.getfullargspec(dest) dest_spec = inspect.getfullargspec(dest)
if source_spec.varargs or source_spec.varkw: if source_spec.varargs or source_spec.varkw:
@ -128,13 +135,15 @@ def _merge(
'co_kwonlyargcount': len(kwonlyargs_merged), 'co_kwonlyargcount': len(kwonlyargs_merged),
'co_posonlyargcount': dest.__code__.co_posonlyargcount, 'co_posonlyargcount': dest.__code__.co_posonlyargcount,
'co_nlocals': len(args_all), 'co_nlocals': len(args_all),
'co_flags': source.__code__.co_flags,
'co_varnames': args_all, 'co_varnames': args_all,
'co_filename': dest.__code__.co_filename, 'co_filename': dest.__code__.co_filename,
'co_name': dest.__code__.co_name, 'co_name': dest.__code__.co_name,
'co_firstlineno': dest.__code__.co_firstlineno, 'co_firstlineno': dest.__code__.co_firstlineno,
} }
if hasattr(source, "__code__"):
replace_kwargs['co_flags'] = source.__code__.co_flags
if PY310: if PY310:
replace_kwargs['co_linetable'] = dest.__code__.co_linetable replace_kwargs['co_linetable'] = dest.__code__.co_linetable
else: else:
@ -151,7 +160,7 @@ def _merge(
len(kwonlyargs_merged), len(kwonlyargs_merged),
_blank.__code__.co_nlocals, _blank.__code__.co_nlocals,
_blank.__code__.co_stacksize, _blank.__code__.co_stacksize,
source.__code__.co_flags, source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags,
_blank.__code__.co_code, (), (), _blank.__code__.co_code, (), (),
args_all, dest.__code__.co_filename, args_all, dest.__code__.co_filename,
dest.__code__.co_name, dest.__code__.co_name,
@ -171,6 +180,9 @@ def _merge(
dest_ret = dest.__annotations__['return'] dest_ret = dest.__annotations__['return']
for v in ('__kwdefaults__', '__annotations__'): for v in ('__kwdefaults__', '__annotations__'):
if not hasattr(source, v):
continue
out = getattr(source, v) out = getattr(source, v)
if out is None: if out is None:
out = {} out = {}

View File

@ -33,7 +33,8 @@ def unify(string: str) -> str:
except LanguageDetectionError: except LanguageDetectionError:
pass pass
return string.lower() string = unify_punctuation(string)
return string.lower().strip()
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: