216 lines
6.3 KiB
Python
216 lines
6.3 KiB
Python
import json
|
|
from pathlib import Path
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional
|
|
from functools import lru_cache
|
|
import logging
|
|
|
|
from ..utils.config import main_settings
|
|
from ..utils.string_processing import fit_to_file_system
|
|
|
|
|
|
@dataclass
|
|
class CacheAttribute:
|
|
module: str
|
|
name: str
|
|
|
|
created: datetime
|
|
expires: datetime
|
|
|
|
additional_info: dict = field(default_factory=dict)
|
|
|
|
@property
|
|
def id(self):
|
|
return f"{self.module}_{self.name}"
|
|
|
|
@property
|
|
def is_valid(self):
|
|
if isinstance(self.expires, str):
|
|
self.expires = datetime.fromisoformat(self.expires)
|
|
return datetime.now() < self.expires
|
|
|
|
def __eq__(self, other):
|
|
return self.__dict__ == other.__dict__
|
|
|
|
|
|
@dataclass
|
|
class CacheResult:
|
|
content: bytes
|
|
attribute: CacheAttribute
|
|
|
|
|
|
class Cache:
|
|
def __init__(self, module: str, logger: logging.Logger):
|
|
self.module = module
|
|
self.logger: logging.Logger = logger
|
|
|
|
self._dir = main_settings["cache_directory"]
|
|
self.index = Path(self._dir, "index.json")
|
|
|
|
if not self.index.is_file():
|
|
with self.index.open("w") as i:
|
|
i.write(json.dumps([]))
|
|
|
|
self.cached_attributes: List[CacheAttribute] = []
|
|
self._id_to_attribute = {}
|
|
|
|
self._time_fields = {"created", "expires"}
|
|
with self.index.open("r") as i:
|
|
try:
|
|
for c in json.loads(i.read()):
|
|
for key in self._time_fields:
|
|
c[key] = datetime.fromisoformat(c[key])
|
|
|
|
ca = CacheAttribute(**c)
|
|
self.cached_attributes.append(ca)
|
|
self._id_to_attribute[ca.id] = ca
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
@lru_cache()
|
|
def _init_module(self, module: str) -> Path:
|
|
"""
|
|
:param module:
|
|
:return: the module path
|
|
"""
|
|
r = Path(self._dir, module)
|
|
r.mkdir(exist_ok=True, parents=True)
|
|
return r
|
|
|
|
def _write_index(self, indent: int = 4):
|
|
_json = []
|
|
for c in self.cached_attributes:
|
|
d = c.__dict__
|
|
for key in self._time_fields:
|
|
if not isinstance(d[key], str):
|
|
d[key] = d[key].isoformat()
|
|
|
|
_json.append(d)
|
|
|
|
with self.index.open("w") as f:
|
|
f.write(json.dumps(_json, indent=indent))
|
|
|
|
def _write_attribute(self, cached_attribute: CacheAttribute, write: bool = True) -> bool:
|
|
existing_attribute: Optional[CacheAttribute] = self._id_to_attribute.get(cached_attribute.id)
|
|
if existing_attribute is not None:
|
|
# the attribute exists
|
|
if existing_attribute == cached_attribute:
|
|
return True
|
|
|
|
if existing_attribute.is_valid:
|
|
return False
|
|
|
|
existing_attribute.__dict__ = cached_attribute.__dict__
|
|
else:
|
|
self.cached_attributes.append(cached_attribute)
|
|
self._id_to_attribute[cached_attribute.id] = cached_attribute
|
|
|
|
if write:
|
|
self._write_index()
|
|
|
|
return True
|
|
|
|
def set(self, content: bytes, name: str, expires_in: float = 10, module: str = "", additional_info: dict = None):
|
|
"""
|
|
:param content:
|
|
:param module:
|
|
:param name:
|
|
:param expires_in: the unit is days
|
|
:return:
|
|
"""
|
|
if name == "":
|
|
return
|
|
|
|
additional_info = additional_info or {}
|
|
module = self.module if module == "" else module
|
|
|
|
module_path = self._init_module(module)
|
|
|
|
cache_attribute = CacheAttribute(
|
|
module=module,
|
|
name=name,
|
|
created=datetime.now(),
|
|
expires=datetime.now() + timedelta(days=expires_in),
|
|
additional_info=additional_info,
|
|
)
|
|
self._write_attribute(cache_attribute)
|
|
|
|
cache_path = fit_to_file_system(Path(module_path, name), hidden_ok=True)
|
|
with cache_path.open("wb") as content_file:
|
|
self.logger.debug(f"writing cache to {cache_path}")
|
|
content_file.write(content)
|
|
|
|
def get(self, name: str) -> Optional[CacheResult]:
|
|
path = fit_to_file_system(Path(self._dir, self.module, name), hidden_ok=True)
|
|
|
|
if not path.is_file():
|
|
return None
|
|
|
|
# check if it is outdated
|
|
existing_attribute: CacheAttribute = self._id_to_attribute[f"{self.module}_{name}"]
|
|
if not existing_attribute.is_valid:
|
|
return
|
|
|
|
with path.open("rb") as f:
|
|
return CacheResult(content=f.read(), attribute=existing_attribute)
|
|
|
|
def clean(self):
|
|
keep = set()
|
|
|
|
for ca in self.cached_attributes.copy():
|
|
if ca.name == "":
|
|
continue
|
|
|
|
file = fit_to_file_system(Path(self._dir, ca.module, ca.name), hidden_ok=True)
|
|
|
|
if not ca.is_valid:
|
|
self.logger.debug(f"deleting cache {ca.id}")
|
|
file.unlink()
|
|
self.cached_attributes.remove(ca)
|
|
del self._id_to_attribute[ca.id]
|
|
|
|
else:
|
|
keep.add(file)
|
|
|
|
# iterate through every module (folder)
|
|
for module_path in self._dir.iterdir():
|
|
if not module_path.is_dir():
|
|
continue
|
|
|
|
# delete all files not in keep
|
|
for path in module_path.iterdir():
|
|
if path not in keep:
|
|
self.logger.info(f"Deleting cache {path}")
|
|
path.unlink()
|
|
|
|
# delete all empty directories
|
|
for path in module_path.iterdir():
|
|
if path.is_dir() and not list(path.iterdir()):
|
|
self.logger.debug(f"Deleting cache directory {path}")
|
|
path.rmdir()
|
|
|
|
self._write_index()
|
|
|
|
def clear(self):
|
|
"""
|
|
delete every file in the cache directory
|
|
:return:
|
|
"""
|
|
|
|
for path in self._dir.iterdir():
|
|
if path.is_dir():
|
|
for file in path.iterdir():
|
|
file.unlink()
|
|
path.rmdir()
|
|
else:
|
|
path.unlink()
|
|
|
|
self.cached_attributes.clear()
|
|
self._id_to_attribute.clear()
|
|
|
|
self._write_index()
|
|
|
|
def __repr__(self):
|
|
return f"<Cache {self.module}>"
|