fix: sanatizing file names correctly
This commit is contained in:
parent
13b9c0b35e
commit
1794c0535e
@ -41,4 +41,10 @@ if __name__ == "__main__":
|
|||||||
"d: 0"
|
"d: 0"
|
||||||
]
|
]
|
||||||
|
|
||||||
music_kraken.cli.download(genre="test", command_list=cross_download, process_metadata_anyway=True)
|
bandcamp_test = [
|
||||||
|
"s: #a Ghost Bath",
|
||||||
|
"d: 0"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
music_kraken.cli.download(genre="test", command_list=cross_download, process_metadata_anyway=True)
|
@ -3,7 +3,7 @@ import logging
|
|||||||
import gc
|
import gc
|
||||||
import musicbrainzngs
|
import musicbrainzngs
|
||||||
|
|
||||||
from .utils.shared import DEBUG
|
from .utils.shared import DEBUG, DEBUG_LOGGIN
|
||||||
from .utils.config import logging_settings, main_settings, read_config
|
from .utils.config import logging_settings, main_settings, read_config
|
||||||
read_config()
|
read_config()
|
||||||
from . import cli
|
from . import cli
|
||||||
@ -11,7 +11,7 @@ from . import cli
|
|||||||
|
|
||||||
# configure logger default
|
# configure logger default
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging_settings['log_level'] if not DEBUG else logging.DEBUG,
|
level=logging_settings['log_level'] if not DEBUG_LOGGIN else logging.DEBUG,
|
||||||
format=logging_settings['logging_format'],
|
format=logging_settings['logging_format'],
|
||||||
handlers=[
|
handlers=[
|
||||||
logging.FileHandler(main_settings['log_file']),
|
logging.FileHandler(main_settings['log_file']),
|
||||||
|
@ -7,6 +7,7 @@ from tqdm import tqdm
|
|||||||
|
|
||||||
from .parents import DatabaseObject
|
from .parents import DatabaseObject
|
||||||
from ..utils.config import main_settings, logging_settings
|
from ..utils.config import main_settings, logging_settings
|
||||||
|
from ..utils.string_processing import fit_to_file_system
|
||||||
|
|
||||||
|
|
||||||
LOGGER = logging.getLogger("target")
|
LOGGER = logging.getLogger("target")
|
||||||
@ -35,8 +36,8 @@ class Target(DatabaseObject):
|
|||||||
relative_to_music_dir: bool = False
|
relative_to_music_dir: bool = False
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(dynamic=dynamic)
|
super().__init__(dynamic=dynamic)
|
||||||
self._file: Path = Path(file)
|
self._file: Path = Path(fit_to_file_system(file))
|
||||||
self._path: Path = Path(main_settings["music_directory"], path) if relative_to_music_dir else Path(path)
|
self._path: Path = fit_to_file_system(Path(main_settings["music_directory"], path) if relative_to_music_dir else Path(path))
|
||||||
|
|
||||||
self.is_relative_to_music_dir: bool = relative_to_music_dir
|
self.is_relative_to_music_dir: bool = relative_to_music_dir
|
||||||
|
|
||||||
|
@ -281,9 +281,8 @@ class Page:
|
|||||||
return merge_together(music_object, new_music_object, do_compile=post_process)
|
return merge_together(music_object, new_music_object, do_compile=post_process)
|
||||||
|
|
||||||
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]:
|
def fetch_object_from_source(self, source: Source, stop_at_level: int = 2, enforce_type: Type[DatabaseObject] = None, post_process: bool = True) -> Optional[DatabaseObject]:
|
||||||
obj_type = self.get_source_type(
|
obj_type = self.get_source_type(source)
|
||||||
source)
|
|
||||||
print("obj type", obj_type, self)
|
|
||||||
if obj_type is None:
|
if obj_type is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -334,7 +333,7 @@ class Page:
|
|||||||
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
|
def download(self, music_object: DatabaseObject, genre: str, download_all: bool = False, process_metadata_anyway: bool = False) -> DownloadResult:
|
||||||
# print("downloading")
|
# print("downloading")
|
||||||
|
|
||||||
self.fetch_details(music_object, stop_at_level=2)
|
# self.fetch_details(music_object, stop_at_level=1)
|
||||||
|
|
||||||
naming_dict: NamingDict = NamingDict({"genre": genre})
|
naming_dict: NamingDict = NamingDict({"genre": genre})
|
||||||
|
|
||||||
|
@ -28,6 +28,17 @@ if DEBUG:
|
|||||||
from ..utils.debug_utils import dump_to_file
|
from ..utils.debug_utils import dump_to_file
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_artist_url(url: str) -> str:
|
||||||
|
parsed = urlparse(url)
|
||||||
|
return urlunparse((parsed.scheme, parsed.netloc, "/music/", "", "", ""))
|
||||||
|
|
||||||
|
|
||||||
|
def _get_host(source: Source) -> str:
|
||||||
|
parsed = urlparse(source.url)
|
||||||
|
return urlunparse((parsed.scheme, parsed.netloc, "", "", "", ""))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class BandcampTypes(Enum):
|
class BandcampTypes(Enum):
|
||||||
ARTIST = "b"
|
ARTIST = "b"
|
||||||
ALBUM = "a"
|
ALBUM = "a"
|
||||||
@ -38,7 +49,6 @@ class Bandcamp(Page):
|
|||||||
# CHANGE
|
# CHANGE
|
||||||
SOURCE_TYPE = SourcePages.BANDCAMP
|
SOURCE_TYPE = SourcePages.BANDCAMP
|
||||||
LOGGER = logging_settings["bandcamp_logger"]
|
LOGGER = logging_settings["bandcamp_logger"]
|
||||||
HOST = "https://onlysmile.bandcamp.com"
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.connection: Connection = Connection(
|
self.connection: Connection = Connection(
|
||||||
@ -52,7 +62,7 @@ class Bandcamp(Page):
|
|||||||
parsed_url = urlparse(source.url)
|
parsed_url = urlparse(source.url)
|
||||||
path = parsed_url.path.replace("/", "")
|
path = parsed_url.path.replace("/", "")
|
||||||
|
|
||||||
if path == "":
|
if path == "" or path.startswith("music"):
|
||||||
return Artist
|
return Artist
|
||||||
if path.startswith("album"):
|
if path.startswith("album"):
|
||||||
return Album
|
return Album
|
||||||
@ -81,6 +91,7 @@ class Bandcamp(Page):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if object_type is BandcampTypes.ARTIST:
|
if object_type is BandcampTypes.ARTIST:
|
||||||
|
source_list = [Source(self.SOURCE_TYPE, _parse_artist_url(url))]
|
||||||
return Artist(
|
return Artist(
|
||||||
name=name,
|
name=name,
|
||||||
source_list=source_list
|
source_list=source_list
|
||||||
@ -92,7 +103,7 @@ class Bandcamp(Page):
|
|||||||
source_list=source_list,
|
source_list=source_list,
|
||||||
artist_list=[
|
artist_list=[
|
||||||
Artist(
|
Artist(
|
||||||
name=data["band_name"],
|
name=data["band_name"].strip(),
|
||||||
source_list=[
|
source_list=[
|
||||||
Source(self.SOURCE_TYPE, data["item_url_root"])
|
Source(self.SOURCE_TYPE, data["item_url_root"])
|
||||||
]
|
]
|
||||||
@ -102,7 +113,7 @@ class Bandcamp(Page):
|
|||||||
|
|
||||||
if object_type is BandcampTypes.SONG:
|
if object_type is BandcampTypes.SONG:
|
||||||
return Song(
|
return Song(
|
||||||
title=name,
|
title=name.strip(),
|
||||||
source_list=source_list,
|
source_list=source_list,
|
||||||
main_artist_list=[
|
main_artist_list=[
|
||||||
Artist(
|
Artist(
|
||||||
@ -127,7 +138,7 @@ class Bandcamp(Page):
|
|||||||
return results
|
return results
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
dump_to_file("bandcamp_response.json", r.text, is_json=True, exit_after_dump=False)
|
dump_to_file("bandcamp_search_response.json", r.text, is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
data = r.json()
|
data = r.json()
|
||||||
|
|
||||||
@ -172,20 +183,20 @@ class Bandcamp(Page):
|
|||||||
if li is None and li['href'] is not None:
|
if li is None and li['href'] is not None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
source_list.append(Source.match_url(li['href'], referer_page=self.SOURCE_TYPE))
|
source_list.append(Source.match_url(_parse_artist_url(li['href']), referer_page=self.SOURCE_TYPE))
|
||||||
|
|
||||||
return Artist(
|
return Artist(
|
||||||
name=name,
|
name=name,
|
||||||
source_list=source_list
|
source_list=source_list
|
||||||
)
|
)
|
||||||
|
|
||||||
def _parse_album(self, soup: BeautifulSoup) -> List[Album]:
|
def _parse_album(self, soup: BeautifulSoup, initial_source: Source) -> List[Album]:
|
||||||
title = None
|
title = None
|
||||||
source_list: List[Source] = []
|
source_list: List[Source] = []
|
||||||
|
|
||||||
a = soup.find("a")
|
a = soup.find("a")
|
||||||
if a is not None and a["href"] is not None:
|
if a is not None and a["href"] is not None:
|
||||||
source_list.append(Source(self.SOURCE_TYPE, self.HOST + a["href"]))
|
source_list.append(Source(self.SOURCE_TYPE, _get_host(initial_source) + a["href"]))
|
||||||
|
|
||||||
title_p = soup.find("p", {"class": "title"})
|
title_p = soup.find("p", {"class": "title"})
|
||||||
if title_p is not None:
|
if title_p is not None:
|
||||||
@ -194,15 +205,12 @@ class Bandcamp(Page):
|
|||||||
return Album(title=title, source_list=source_list)
|
return Album(title=title, source_list=source_list)
|
||||||
|
|
||||||
def _parse_artist_data_blob(self, data_blob: dict, artist_url: str):
|
def _parse_artist_data_blob(self, data_blob: dict, artist_url: str):
|
||||||
if DEBUG:
|
|
||||||
dump_to_file("bandcamp_data_blob.json", json.dumps(data_blob), is_json=True, exit_after_dump=False)
|
|
||||||
|
|
||||||
parsed_artist_url = urlparse(artist_url)
|
parsed_artist_url = urlparse(artist_url)
|
||||||
album_list: List[Album] = []
|
album_list: List[Album] = []
|
||||||
|
|
||||||
for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []):
|
for album_json in data_blob.get("buyfulldisco", {}).get("tralbums", []):
|
||||||
album_list.append(Album(
|
album_list.append(Album(
|
||||||
title=album_json["title"],
|
title=album_json["title"].strip(),
|
||||||
source_list=[Source(
|
source_list=[Source(
|
||||||
self.SOURCE_TYPE,
|
self.SOURCE_TYPE,
|
||||||
urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", ""))
|
urlunparse((parsed_artist_url.scheme, parsed_artist_url.netloc, album_json["page_url"], "", "", ""))
|
||||||
@ -229,15 +237,17 @@ class Bandcamp(Page):
|
|||||||
html_music_grid = soup.find("ol", {"id": "music-grid"})
|
html_music_grid = soup.find("ol", {"id": "music-grid"})
|
||||||
if html_music_grid is not None:
|
if html_music_grid is not None:
|
||||||
for subsoup in html_music_grid.find_all("li"):
|
for subsoup in html_music_grid.find_all("li"):
|
||||||
artist.main_album_collection.append(self._parse_album(soup=subsoup))
|
artist.main_album_collection.append(self._parse_album(soup=subsoup, initial_source=source))
|
||||||
|
|
||||||
data_blob_soup = soup.find("div", {"id": "pagedata"})
|
for i, data_blob_soup in enumerate(soup.find_all("div", {"id": ["pagedata", "collectors-data"]})):
|
||||||
if data_blob_soup is not None:
|
|
||||||
data_blob = data_blob_soup["data-blob"]
|
data_blob = data_blob_soup["data-blob"]
|
||||||
|
|
||||||
|
if DEBUG:
|
||||||
|
dump_to_file(f"bandcamp_artist_data_blob_{i}.json", data_blob, is_json=True, exit_after_dump=False)
|
||||||
|
|
||||||
if data_blob is not None:
|
if data_blob is not None:
|
||||||
artist.main_album_collection.extend(
|
artist.main_album_collection.extend(
|
||||||
|
self._parse_artist_data_blob(json.loads(data_blob), source.url)
|
||||||
self._parse_artist_data_blob(json.loads(data_blob), source.url)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
artist.source_collection.append(source)
|
artist.source_collection.append(source)
|
||||||
@ -245,9 +255,9 @@ class Bandcamp(Page):
|
|||||||
|
|
||||||
def _parse_track_element(self, track: dict) -> Optional[Song]:
|
def _parse_track_element(self, track: dict) -> Optional[Song]:
|
||||||
return Song(
|
return Song(
|
||||||
title=track["item"]["name"],
|
title=track["item"]["name"].strip(),
|
||||||
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
|
source_list=[Source(self.SOURCE_TYPE, track["item"]["mainEntityOfPage"])],
|
||||||
tracksort=track["position"]
|
tracksort=int(track["position"])
|
||||||
)
|
)
|
||||||
|
|
||||||
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
def fetch_album(self, source: Source, stop_at_level: int = 1) -> Album:
|
||||||
@ -269,13 +279,13 @@ class Bandcamp(Page):
|
|||||||
|
|
||||||
artist_source_list = []
|
artist_source_list = []
|
||||||
if "@id" in artist_data:
|
if "@id" in artist_data:
|
||||||
artist_source_list=[Source(self.SOURCE_TYPE, artist_data["@id"])]
|
artist_source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
|
||||||
album = Album(
|
album = Album(
|
||||||
title=data["name"],
|
title=data["name"].strip(),
|
||||||
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
|
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]))],
|
||||||
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
||||||
artist_list=[Artist(
|
artist_list=[Artist(
|
||||||
name=artist_data["name"],
|
name=artist_data["name"].strip(),
|
||||||
source_list=artist_source_list
|
source_list=artist_source_list
|
||||||
)]
|
)]
|
||||||
)
|
)
|
||||||
@ -331,16 +341,16 @@ class Bandcamp(Page):
|
|||||||
mp3_url = value
|
mp3_url = value
|
||||||
|
|
||||||
song = Song(
|
song = Song(
|
||||||
title=data["name"],
|
title=data["name"].strip(),
|
||||||
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), adio_url=mp3_url)],
|
source_list=[Source(self.SOURCE_TYPE, data.get("mainEntityOfPage", data["@id"]), adio_url=mp3_url)],
|
||||||
album_list=[Album(
|
album_list=[Album(
|
||||||
title=album_data["name"],
|
title=album_data["name"].strip(),
|
||||||
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
date=ID3Timestamp.strptime(data["datePublished"], "%d %b %Y %H:%M:%S %Z"),
|
||||||
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
|
source_list=[Source(self.SOURCE_TYPE, album_data["@id"])]
|
||||||
)],
|
)],
|
||||||
main_artist_list=[Artist(
|
main_artist_list=[Artist(
|
||||||
name=artist_data["name"],
|
name=artist_data["name"].strip(),
|
||||||
source_list=[Source(self.SOURCE_TYPE, artist_data["@id"])]
|
source_list=[Source(self.SOURCE_TYPE, _parse_artist_url(artist_data["@id"]))]
|
||||||
)],
|
)],
|
||||||
lyrics_list=self._fetch_lyrics(soup=soup)
|
lyrics_list=self._fetch_lyrics(soup=soup)
|
||||||
)
|
)
|
||||||
|
@ -3,6 +3,7 @@ import random
|
|||||||
from .config import main_settings
|
from .config import main_settings
|
||||||
|
|
||||||
DEBUG = True
|
DEBUG = True
|
||||||
|
DEBUG_LOGGIN = DEBUG and False
|
||||||
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
DEBUG_YOUTUBE_INITIALIZING = DEBUG and False
|
||||||
DEBUG_PAGES = DEBUG and False
|
DEBUG_PAGES = DEBUG and False
|
||||||
|
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
from typing import Tuple
|
from typing import Tuple, Union
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from transliterate.exceptions import LanguageDetectionError
|
from transliterate.exceptions import LanguageDetectionError
|
||||||
from transliterate import translit
|
from transliterate import translit
|
||||||
@ -25,20 +26,26 @@ def unify(string: str) -> str:
|
|||||||
return string.lower()
|
return string.lower()
|
||||||
|
|
||||||
|
|
||||||
def fit_to_file_system(string: str) -> str:
|
def fit_to_file_system(string: Union[str, Path]) -> Union[str, Path]:
|
||||||
string = string.strip()
|
def fit_string(string: str) -> str:
|
||||||
|
if string == "/":
|
||||||
|
return "/"
|
||||||
|
string = string.strip()
|
||||||
|
|
||||||
while string[0] == ".":
|
while string[0] == ".":
|
||||||
if len(string) == 0:
|
if len(string) == 0:
|
||||||
return string
|
return string
|
||||||
|
|
||||||
string = string[1:]
|
string = string[1:]
|
||||||
|
|
||||||
string = string.replace("/", "_").replace("\\", "_")
|
string = string.replace("/", "_").replace("\\", "_")
|
||||||
|
string = sanitize_filename(string)
|
||||||
|
return string
|
||||||
|
|
||||||
string = sanitize_filename(string)
|
if isinstance(string, Path):
|
||||||
|
return Path(*(fit_string(part) for part in string.parts))
|
||||||
return string
|
else:
|
||||||
|
return fit_string(string)
|
||||||
|
|
||||||
|
|
||||||
def clean_song_title(raw_song_title: str, artist_name: str) -> str:
|
def clean_song_title(raw_song_title: str, artist_name: str) -> str:
|
||||||
|
Loading…
Reference in New Issue
Block a user