Compare commits
No commits in common. "e93f6d754cb7d92315f4c8e51c335347d705ddb4" and "312e26ec44f7008f60ea7639285cad56f9698e8a" have entirely different histories.
e93f6d754c
...
312e26ec44
@ -4,7 +4,6 @@ from collections import defaultdict
|
|||||||
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
|
from typing import TypeVar, Generic, Dict, Optional, Iterable, List, Iterator, Tuple, Generator, Union, Any, Set
|
||||||
from .parents import OuterProxy
|
from .parents import OuterProxy
|
||||||
from ..utils import object_trace
|
from ..utils import object_trace
|
||||||
from ..utils import output, BColors
|
|
||||||
|
|
||||||
T = TypeVar('T', bound=OuterProxy)
|
T = TypeVar('T', bound=OuterProxy)
|
||||||
|
|
||||||
@ -81,12 +80,11 @@ class Collection(Generic[T]):
|
|||||||
for e in self:
|
for e in self:
|
||||||
self._map_element(e)
|
self._map_element(e)
|
||||||
|
|
||||||
def _find_object(self, __object: T, no_push_to: bool = False) -> Optional[T]:
|
def _find_object(self, __object: T) -> Optional[T]:
|
||||||
if not no_push_to:
|
for c in self.push_to:
|
||||||
for c in self.push_to:
|
found = c._find_object(__object)
|
||||||
found, found_in = c._find_object(__object, no_push_to=True)
|
if found is not None:
|
||||||
if found is not None:
|
return found, c
|
||||||
return found, found_in
|
|
||||||
|
|
||||||
self._remap()
|
self._remap()
|
||||||
|
|
||||||
@ -94,8 +92,6 @@ class Collection(Generic[T]):
|
|||||||
if value in self._indexed_values[name]:
|
if value in self._indexed_values[name]:
|
||||||
return self._indexed_values[name][value], self
|
return self._indexed_values[name][value], self
|
||||||
|
|
||||||
return None, self
|
|
||||||
|
|
||||||
def append(self, __object: Optional[T], **kwargs):
|
def append(self, __object: Optional[T], **kwargs):
|
||||||
"""
|
"""
|
||||||
If an object, that represents the same entity exists in a relevant collection,
|
If an object, that represents the same entity exists in a relevant collection,
|
||||||
@ -106,20 +102,10 @@ class Collection(Generic[T]):
|
|||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
if __object is None:
|
if __object is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_object, map_to = self._find_object(__object, no_push_to=kwargs.get("no_push_to", False))
|
existing_object, map_to = self._find_object(__object)
|
||||||
|
|
||||||
if map_to is self:
|
|
||||||
for other, contained in (c._find_object(__object, no_push_to=True) for c in self.pull_from):
|
|
||||||
output(other, __object, contained, color=BColors.RED)
|
|
||||||
if other is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
__object.__merge__(other, no_push_to=False, **kwargs)
|
|
||||||
contained.remove(other)
|
|
||||||
|
|
||||||
if existing_object is None:
|
if existing_object is None:
|
||||||
# append
|
# append
|
||||||
@ -147,27 +133,22 @@ class Collection(Generic[T]):
|
|||||||
b_data = b.data.copy()
|
b_data = b.data.copy()
|
||||||
b_collection_for = b._collection_for.copy()
|
b_collection_for = b._collection_for.copy()
|
||||||
no_sync_collection.add(id(b))
|
no_sync_collection.add(id(b))
|
||||||
# kwargs["no_sync_collection"] = no_sync_collection
|
kwargs["no_sync_collection"] = no_sync_collection
|
||||||
del b
|
del b
|
||||||
|
|
||||||
a.extend(b_data, **kwargs)
|
|
||||||
|
|
||||||
for synced_with, key in b_collection_for.items():
|
for synced_with, key in b_collection_for.items():
|
||||||
synced_with.__setattr__(key, a)
|
synced_with.__setattr__(key, a)
|
||||||
a._collection_for[synced_with] = key
|
a._collection_for[synced_with] = key
|
||||||
|
|
||||||
|
a.extend(b_data, **kwargs)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# merge only if the two objects are not the same
|
# merge only if the two objects are not the same
|
||||||
if existing_object.id == __object.id:
|
if existing_object.id == __object.id:
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_object.merge(__object, **kwargs)
|
existing_object.merge(__object, **kwargs)
|
||||||
map_to._map_element(existing_object)
|
map_to._map_element(existing_object)
|
||||||
|
|
||||||
def remove(self, __object: T) -> T:
|
|
||||||
self._data.remove(__object)
|
|
||||||
self._unmap_element(__object)
|
|
||||||
return __object
|
|
||||||
|
|
||||||
def contains(self, __object: T) -> bool:
|
def contains(self, __object: T) -> bool:
|
||||||
return self._find_object(__object) is not None
|
return self._find_object(__object) is not None
|
||||||
|
@ -144,7 +144,6 @@ class Song(Base):
|
|||||||
}
|
}
|
||||||
|
|
||||||
self.feature_artist_collection.push_to = [self.main_artist_collection]
|
self.feature_artist_collection.push_to = [self.main_artist_collection]
|
||||||
self.main_artist_collection.pull_from = [self.feature_artist_collection]
|
|
||||||
|
|
||||||
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
def _add_other_db_objects(self, object_type: Type[OuterProxy], object_list: List[OuterProxy]):
|
||||||
if object_type is Song:
|
if object_type is Song:
|
||||||
|
@ -56,7 +56,14 @@ def music_responsive_list_item_renderer(renderer: dict) -> List[DatabaseObject]:
|
|||||||
|
|
||||||
for song in song_list:
|
for song in song_list:
|
||||||
song.album_collection.extend(album_list)
|
song.album_collection.extend(album_list)
|
||||||
song.feature_artist_collection.extend(artist_list)
|
|
||||||
|
for artist in artist_list:
|
||||||
|
existing_artist = song.main_artist_collection._find_object(artist)
|
||||||
|
|
||||||
|
if existing_artist is None:
|
||||||
|
song.feature_artist_collection.append(artist)
|
||||||
|
else:
|
||||||
|
existing_artist.merge(artist)
|
||||||
|
|
||||||
if len(song_list) > 0:
|
if len(song_list) > 0:
|
||||||
return song_list
|
return song_list
|
||||||
|
@ -498,22 +498,7 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
|
|
||||||
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
self.fetch_media_url(source=source, ydl_res=ydl_res)
|
||||||
|
|
||||||
artist_names = []
|
artist_name = ydl_res.get("artist", ydl_res.get("uploader", "")).rstrip(" - Topic")
|
||||||
uploader = ydl_res.get("uploader", "")
|
|
||||||
if uploader.endswith(" - Topic"):
|
|
||||||
artist_names = [uploader.rstrip(" - Topic")]
|
|
||||||
"""
|
|
||||||
elif "artist" in ydl_res:
|
|
||||||
artist_names = ydl_res.get("artist").split(", ")
|
|
||||||
"""
|
|
||||||
artist_list = [
|
|
||||||
Artist(
|
|
||||||
name=name,
|
|
||||||
source_list=[Source(
|
|
||||||
SourcePages.YOUTUBE_MUSIC,
|
|
||||||
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
|
|
||||||
)]
|
|
||||||
) for name in artist_names]
|
|
||||||
|
|
||||||
album_list = []
|
album_list = []
|
||||||
if "album" in ydl_res:
|
if "album" in ydl_res:
|
||||||
@ -522,14 +507,19 @@ class YoutubeMusic(SuperYouTube):
|
|||||||
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
|
date=ID3Timestamp.strptime(ydl_res.get("upload_date"), "%Y%m%d"),
|
||||||
))
|
))
|
||||||
|
|
||||||
artist_name = artist_names[0] if len(artist_names) > 0 else None
|
|
||||||
return Song(
|
return Song(
|
||||||
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
|
title=ydl_res.get("track", clean_song_title(ydl_res.get("title"), artist_name=artist_name)),
|
||||||
note=ydl_res.get("descriptions"),
|
note=ydl_res.get("descriptions"),
|
||||||
album_list=album_list,
|
album_list=album_list,
|
||||||
length=int(ydl_res.get("duration", 0)) * 1000,
|
length=int(ydl_res.get("duration", 0)) * 1000,
|
||||||
artwork=Artwork(*ydl_res.get("thumbnails", [])),
|
artwork=Artwork(*ydl_res.get("thumbnails", [])),
|
||||||
main_artist_list=artist_list,
|
main_artist_list=[Artist(
|
||||||
|
name=artist_name,
|
||||||
|
source_list=[Source(
|
||||||
|
SourcePages.YOUTUBE_MUSIC,
|
||||||
|
f"https://music.youtube.com/channel/{ydl_res.get('channel_id', ydl_res.get('uploader_id', ''))}"
|
||||||
|
)]
|
||||||
|
)],
|
||||||
source_list=[Source(
|
source_list=[Source(
|
||||||
SourcePages.YOUTUBE_MUSIC,
|
SourcePages.YOUTUBE_MUSIC,
|
||||||
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
|
f"https://music.youtube.com/watch?v={ydl_res.get('id')}"
|
||||||
|
@ -3,30 +3,24 @@ from pathlib import Path
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import inspect
|
import inspect
|
||||||
from typing import List
|
|
||||||
|
|
||||||
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
|
from .shared import DEBUG, DEBUG_LOGGING, DEBUG_DUMP, DEBUG_TRACE, DEBUG_OBJECT_TRACE, DEBUG_OBJECT_TRACE_CALLSTACK
|
||||||
from .config import config, read_config, write_config
|
from .config import config, read_config, write_config
|
||||||
from .enums.colors import BColors
|
from .enums.colors import BColors
|
||||||
from .path_manager import LOCATIONS
|
from .path_manager import LOCATIONS
|
||||||
from .hacking import merge_args
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
IO functions
|
IO functions
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _apply_color(msg: str, color: BColors) -> str:
|
def _apply_color(msg: str, color: BColors) -> str:
|
||||||
if not isinstance(msg, str):
|
|
||||||
msg = str(msg)
|
|
||||||
|
|
||||||
if color is BColors.ENDC:
|
if color is BColors.ENDC:
|
||||||
return msg
|
return msg
|
||||||
return color.value + msg + BColors.ENDC.value
|
return color.value + msg + BColors.ENDC.value
|
||||||
|
|
||||||
|
|
||||||
@merge_args(print)
|
def output(msg: str, color: BColors = BColors.ENDC):
|
||||||
def output(*msg: List[str], color: BColors = BColors.ENDC, **kwargs):
|
print(_apply_color(msg, color))
|
||||||
print(*(_apply_color(s, color) for s in msg), **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def user_input(msg: str, color: BColors = BColors.ENDC):
|
def user_input(msg: str, color: BColors = BColors.ENDC):
|
||||||
|
@ -78,14 +78,7 @@ def _merge(
|
|||||||
drop_args = []
|
drop_args = []
|
||||||
if drop_kwonlyargs is None:
|
if drop_kwonlyargs is None:
|
||||||
drop_kwonlyargs = []
|
drop_kwonlyargs = []
|
||||||
|
source_spec = inspect.getfullargspec(source)
|
||||||
is_builtin = False
|
|
||||||
try:
|
|
||||||
source_spec = inspect.getfullargspec(source)
|
|
||||||
except TypeError:
|
|
||||||
is_builtin = True
|
|
||||||
source_spec = inspect.FullArgSpec(type(source).__name__, [], [], [], [], [], [])
|
|
||||||
|
|
||||||
dest_spec = inspect.getfullargspec(dest)
|
dest_spec = inspect.getfullargspec(dest)
|
||||||
|
|
||||||
if source_spec.varargs or source_spec.varkw:
|
if source_spec.varargs or source_spec.varkw:
|
||||||
@ -135,15 +128,13 @@ def _merge(
|
|||||||
'co_kwonlyargcount': len(kwonlyargs_merged),
|
'co_kwonlyargcount': len(kwonlyargs_merged),
|
||||||
'co_posonlyargcount': dest.__code__.co_posonlyargcount,
|
'co_posonlyargcount': dest.__code__.co_posonlyargcount,
|
||||||
'co_nlocals': len(args_all),
|
'co_nlocals': len(args_all),
|
||||||
|
'co_flags': source.__code__.co_flags,
|
||||||
'co_varnames': args_all,
|
'co_varnames': args_all,
|
||||||
'co_filename': dest.__code__.co_filename,
|
'co_filename': dest.__code__.co_filename,
|
||||||
'co_name': dest.__code__.co_name,
|
'co_name': dest.__code__.co_name,
|
||||||
'co_firstlineno': dest.__code__.co_firstlineno,
|
'co_firstlineno': dest.__code__.co_firstlineno,
|
||||||
}
|
}
|
||||||
|
|
||||||
if hasattr(source, "__code__"):
|
|
||||||
replace_kwargs['co_flags'] = source.__code__.co_flags
|
|
||||||
|
|
||||||
if PY310:
|
if PY310:
|
||||||
replace_kwargs['co_linetable'] = dest.__code__.co_linetable
|
replace_kwargs['co_linetable'] = dest.__code__.co_linetable
|
||||||
else:
|
else:
|
||||||
@ -160,7 +151,7 @@ def _merge(
|
|||||||
len(kwonlyargs_merged),
|
len(kwonlyargs_merged),
|
||||||
_blank.__code__.co_nlocals,
|
_blank.__code__.co_nlocals,
|
||||||
_blank.__code__.co_stacksize,
|
_blank.__code__.co_stacksize,
|
||||||
source.__code__.co_flags if hasattr(source, "__code__") else dest.__code__.co_flags,
|
source.__code__.co_flags,
|
||||||
_blank.__code__.co_code, (), (),
|
_blank.__code__.co_code, (), (),
|
||||||
args_all, dest.__code__.co_filename,
|
args_all, dest.__code__.co_filename,
|
||||||
dest.__code__.co_name,
|
dest.__code__.co_name,
|
||||||
@ -180,9 +171,6 @@ def _merge(
|
|||||||
dest_ret = dest.__annotations__['return']
|
dest_ret = dest.__annotations__['return']
|
||||||
|
|
||||||
for v in ('__kwdefaults__', '__annotations__'):
|
for v in ('__kwdefaults__', '__annotations__'):
|
||||||
if not hasattr(source, v):
|
|
||||||
continue
|
|
||||||
|
|
||||||
out = getattr(source, v)
|
out = getattr(source, v)
|
||||||
if out is None:
|
if out is None:
|
||||||
out = {}
|
out = {}
|
||||||
|
@ -32,9 +32,8 @@ def unify(string: str) -> str:
|
|||||||
string = translit(string, reversed=True)
|
string = translit(string, reversed=True)
|
||||||
except LanguageDetectionError:
|
except LanguageDetectionError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
string = unify_punctuation(string)
|
return string.lower()
|
||||||
return string.lower().strip()
|
|
||||||
|
|
||||||
|
|
||||||
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]:
|
def fit_to_file_system(string: Union[str, Path], hidden_ok: bool = False) -> Union[str, Path]:
|
||||||
|
Loading…
Reference in New Issue
Block a user