fix/reindex_before_collection #21

Merged
Hazel merged 32 commits from fix/reindex_before_collection into experimental 2024-05-06 17:36:28 +00:00
6 changed files with 72 additions and 25 deletions
Showing only changes of commit e93f6d754c - Show all commits

View File

@ -4,6 +4,7 @@ from collections import defaultdict
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
from .parents import OuterProxy
from ..utils import object_trace
from ..utils import output, BColors
T = TypeVar('T', bound=OuterProxy)
@ -80,11 +81,12 @@ class Collection(Generic[T]):
for e in self:
self._map_element(e)
def _find_object(self, __object: T) -> Optional[T]:
for c in self.push_to:
found, found_in = c._find_object(__object)
if found is not None:
return found, found_in
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
if not no_push_to:
for c in self.push_to:
found, found_in = c._find_object(__object, no_push_to=True)
if found is not None:
return found, found_in
self._remap()
@ -104,10 +106,20 @@ class Collection(Generic[T]):
:return:
"""
if __object is None:
return
existing_object, map_to = self._find_object(__object)
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
if map_to is self:
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
output(other, __object, contained, color=BColors.RED)
if other is None:
continue
__object.__merge__(other, no_push_to=False, **kwargs)
contained.remove(other)
if existing_object is None:
# append
@ -135,22 +147,27 @@ class Collection(Generic[T]):
b_data = b.data.copy()
b_collection_for = b._collection_for.copy()
no_sync_collection.add(id(b))
kwargs["no_sync_collection"] = no_sync_collection
# kwargs["no_sync_collection"] = no_sync_collection
del b
a.extend(b_data, **kwargs)
for synced_with, key in b_collection_for.items():
synced_with.__setattr__(key, a)
a._collection_for[synced_with] = key
a.extend(b_data, **kwargs)
else:
# merge only if the two objects are not the same
if existing_object.id == __object.id:
return
existing_object.merge(__object, **kwargs)
map_to._map_element(existing_object)
map_to._map_element(existing_object)
def remove(self, __object: T) -> T:
self._data.remove(__object)
self._unmap_element(__object)
return __object
def contains(self, __object: T) -> bool:
return self._find_object(__object) is not None

View File

@ -144,6 +144,7 @@ class Song(Base):
}
self.feature_artist_collection.push_to = [self.main_artist_collection]
self.main_artist_collection.pull_from = [self.feature_artist_collection]
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
if object_type is Song:

View File

@ -498,7 +498,22 @@ class YoutubeMusic(SuperYouTube):
self.fetch_media_url(source=source, ydl_res=ydl_res)
artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic")
artist_names = []
uploader = ydl_res.get("uploader", "")
if uploader.endswith(" - Topic"):
artist_names = [uploader.rstrip(" - Topic")]
"""
elif "artist" in ydl_res:
artist_names = ydl_res.get("artist").split(", ")
"""
artist_list = [
Artist(
name=name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
) for name in artist_names]
album_list = []
if "album" in ydl_res:
@ -507,19 +522,14 @@ class YoutubeMusic(SuperYouTube):
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
))
artist_name = artist_names[0] if len(artist_names) > 0 else None
return Song(
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
note=ydl_res.get("descriptions"),
album_list=album_list,
length=int(ydl_res.get("duration", 0)) * 1000,
artwork=Artwork(*ydl_res.get("thumbnails", [])),
main_artist_list=[Artist(
name=artist_name,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
)]
)],
main_artist_list=artist_list,
source_list=[Source(
SourcePages.YOUTUBE_MUSIC,
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"

View File

@ -3,24 +3,30 @@ from pathlib import Path
import json
import logging
import inspect
from typing import List
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
from .config import config, read_config, write_config
from .enums.colors import BColors
from .path_manager import LOCATIONS
from .hacking import merge_args
"""
IO functions
"""
def _apply_color(msg: str, color: BColors) -> str:
if not isinstance(msg, str):
msg = str(msg)
if color is BColors.ENDC:
return msg
return color.value + msg + BColors.ENDC.value
def output(msg: str, color: BColors = BColors.ENDC):
print(_apply_color(msg, color))
@merge_args(print)
def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs):
print(*(_apply_color(s, color) for s in msg), **kwargs)
def user_input(msg: str, color: BColors = BColors.ENDC):

View File

@ -78,7 +78,14 @@ def _merge(
drop_args = []
if drop_kwonlyargs is None:
drop_kwonlyargs = []
source_spec = inspect.getfullargspec(source)
is_builtin = False
try:
source_spec = inspect.getfullargspec(source)
except TypeError:
is_builtin = True
source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], [])
dest_spec = inspect.getfullargspec(dest)
if source_spec.varargs or source_spec.varkw:
@ -128,13 +135,15 @@ def _merge(
'co_kwonlyargcount': len(kwonlyargs_merged),
'co_posonlyargcount': dest.__code__.co_posonlyargcount,
'co_nlocals': len(args_all),
'co_flags': source.__code__.co_flags,
'co_varnames': args_all,
'co_filename': dest.__code__.co_filename,
'co_name': dest.__code__.co_name,
'co_firstlineno': dest.__code__.co_firstlineno,
}
if hasattr(source, "__code__"):
replace_kwargs['co_flags'] = source.__code__.co_flags
if PY310:
replace_kwargs['co_linetable'] = dest.__code__.co_linetable
else:
@ -151,7 +160,7 @@ def _merge(
len(kwonlyargs_merged),
_blank.__code__.co_nlocals,
_blank.__code__.co_stacksize,
source.__code__.co_flags,
source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags,
_blank.__code__.co_code, (), (),
args_all, dest.__code__.co_filename,
dest.__code__.co_name,
@ -171,6 +180,9 @@ def _merge(
dest_ret = dest.__annotations__['return']
for v in ('__kwdefaults__', '__annotations__'):
if not hasattr(source, v):
continue
out = getattr(source, v)
if out is None:
out = {}

View File

@ -32,8 +32,9 @@ def unify(string: str) -> str:
string = translit(string, reversed=True)
except LanguageDetectionError:
pass
return string.lower()
string = unify_punctuation(string)
return string.lower().strip()
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]: