86 lines
2.5 KiB
Python
86 lines
2.5 KiB
Python
from typing import Generator, Type, Dict
|
|
import os.path
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from ics import Calendar, Event
|
|
|
|
from ..utils import config, error, prompt, ICS_FILE, date
|
|
from ..feeds import *
|
|
|
|
|
|
NAME_TO_FEED: Dict[str, Type[Feed]] = {
|
|
"mastodon": MastodonFeed,
|
|
"twitter": TwitterFeed,
|
|
}
|
|
|
|
|
|
class Routine:
|
|
@staticmethod
|
|
def iter_feeds() -> Generator[Type[Feed], None, None]:
|
|
for feed in config.get_field("active_feeds", []):
|
|
if feed not in NAME_TO_FEED:
|
|
raise ValueError(f"Feed {feed} is not implemented.")
|
|
|
|
yield NAME_TO_FEED[feed]
|
|
|
|
@staticmethod
|
|
def init_feed(feed: Type[Feed]) -> Feed:
|
|
feed_config = config.get_field(f"feed.{feed.__config_name__}", {})
|
|
|
|
try:
|
|
return feed(**feed_config)
|
|
except (TypeError, error.InvalidCredential) as e:
|
|
config.set_field(f"feed.{feed.__config_name__}", {feed.prompt_auth()}, update_dict=True)
|
|
feed_config = config.get_field(f"feed.{feed.__config_name__}", {})
|
|
|
|
try:
|
|
return feed(**feed_config)
|
|
except (TypeError, error.InvalidCredential):
|
|
raise error.InvalidCredential(f"Invalid credentials for {feed.__name__}.")
|
|
|
|
@staticmethod
|
|
def get_ics(use_cached: bool = False) -> Calendar:
|
|
if use_cached and ICS_FILE.exists():
|
|
with ICS_FILE.open("r") as f:
|
|
return Calendar(f.read())
|
|
|
|
ics_url = config.get_field("ics_url")
|
|
if not len(ics_url):
|
|
ics_url = prompt.for_string("url or path to the ICS file")
|
|
config.set_field("ics_url", ics_url)
|
|
|
|
origin_path = Path(ics_url)
|
|
|
|
if origin_path.is_file():
|
|
with origin_path.open("rb") as f:
|
|
with ICS_FILE.open("wb") as t:
|
|
t.write(f.read())
|
|
else:
|
|
r = requests.get(ics_url)
|
|
with ICS_FILE.open("wb") as t:
|
|
t.write(r.content)
|
|
|
|
with ICS_FILE.open("r") as f:
|
|
return Calendar(f.read())
|
|
|
|
|
|
@staticmethod
|
|
def get_event_string(event: Event) -> str:
|
|
event_config = config.get_field("messages", {}).get(event.name, {})
|
|
if not len(event_config):
|
|
return ""
|
|
|
|
message = event_config.get("message", "")
|
|
|
|
data = date.event_formatting_values(event, "de")
|
|
|
|
for key, value in data.items():
|
|
message = message.replace(f"{{{key}}}", value)
|
|
|
|
return message.strip()
|
|
|
|
def run(self):
|
|
pass
|
|
|