Compare commits
4 Commits
d19406d3b4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e85681a708 | ||
|
|
0c4c41ff84 | ||
|
|
6be934e374 | ||
|
|
420deb16cf |
@@ -52,9 +52,7 @@ class TwitterFeed(Feed):
|
||||
)
|
||||
logged_in = True
|
||||
|
||||
config.set_field(self.__config_name__, {
|
||||
'cookies': dict(self.client.http.client.cookies)
|
||||
}, update_dict=True)
|
||||
config.set_field(f"feed.{self.__config_name__}.cookies", dict(self.client.http.client.cookies), update_dict=True)
|
||||
|
||||
def post(self, message: str):
|
||||
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
from typing import Generator, Type, Dict
|
||||
import os.path
|
||||
from pathlib import Path
|
||||
|
||||
from ..utils import config, error
|
||||
import requests
|
||||
from ics import Calendar, Event
|
||||
|
||||
from ..utils import config, error, prompt, ICS_FILE, date
|
||||
from ..feeds import *
|
||||
|
||||
|
||||
@@ -21,21 +26,60 @@ class Routine:
|
||||
|
||||
@staticmethod
|
||||
def init_feed(feed: Type[Feed]) -> Feed:
|
||||
feed_config = config.get_field(feed.__config_name__, {})
|
||||
feed_config = config.get_field(f"feed.{feed.__config_name__}", {})
|
||||
|
||||
try:
|
||||
feed_instance = feed(**feed_config)
|
||||
return feed(**feed_config)
|
||||
except (TypeError, error.InvalidCredential) as e:
|
||||
print(feed_config)
|
||||
raise e
|
||||
config.set_field(feed.__name__, feed.prompt_auth(), update_dict=True)
|
||||
feed_config = config.get_field(feed.__config_name__, {})
|
||||
config.set_field(f"feed.{feed.__config_name__}", {feed.prompt_auth()}, update_dict=True)
|
||||
feed_config = config.get_field(f"feed.{feed.__config_name__}", {})
|
||||
|
||||
try:
|
||||
return feed(**feed_config)
|
||||
except (TypeError, error.InvalidCredential):
|
||||
raise error.InvalidCredential(f"Invalid credentials for {feed.__name__}.")
|
||||
|
||||
@staticmethod
|
||||
def get_ics(use_cached: bool = False) -> Calendar:
|
||||
if use_cached and ICS_FILE.exists():
|
||||
with ICS_FILE.open("r") as f:
|
||||
return Calendar(f.read())
|
||||
|
||||
ics_url = config.get_field("ics_url")
|
||||
if not len(ics_url):
|
||||
ics_url = prompt.for_string("url or path to the ICS file")
|
||||
config.set_field("ics_url", ics_url)
|
||||
|
||||
origin_path = Path(ics_url)
|
||||
|
||||
if origin_path.is_file():
|
||||
with origin_path.open("rb") as f:
|
||||
with ICS_FILE.open("wb") as t:
|
||||
t.write(f.read())
|
||||
else:
|
||||
r = requests.get(ics_url)
|
||||
with ICS_FILE.open("wb") as t:
|
||||
t.write(r.content)
|
||||
|
||||
with ICS_FILE.open("r") as f:
|
||||
return Calendar(f.read())
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_event_string(event: Event) -> str:
|
||||
event_config = config.get_field("messages", {}).get(event.name, {})
|
||||
if not len(event_config):
|
||||
return ""
|
||||
|
||||
message = event_config.get("message", "")
|
||||
|
||||
data = date.event_formatting_values(event, "de")
|
||||
|
||||
for key, value in data.items():
|
||||
message = message.replace(f"{{{key}}}", value)
|
||||
|
||||
return message.strip()
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
from . import Routine
|
||||
from ..feeds import Feed
|
||||
from ..feeds import Feed, TwitterFeed
|
||||
from ..utils import date
|
||||
|
||||
from datetime import datetime
|
||||
from ics import Calendar
|
||||
|
||||
|
||||
class Development(Routine):
|
||||
def run(self):
|
||||
for feed_class in self.iter_feeds():
|
||||
feed: Feed = self.init_feed(feed_class)
|
||||
def test_feeds(self):
|
||||
for feed in self.iter_feeds():
|
||||
feed_instance = self.init_feed(feed)
|
||||
feed_instance.post(f"Hello World!!! ({datetime.now()})")
|
||||
|
||||
feed.post(message="This worked second try!")
|
||||
def run(self):
|
||||
self.test_feeds()
|
||||
|
||||
@@ -1,12 +1,23 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import platformdirs
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
PROGRAM_NAME: str = "publish-meetups"
|
||||
PROGRAM_DATA_DIR: str = platformdirs.user_config_path(appname=PROGRAM_NAME)
|
||||
PROGRAM_DATA_DIR: Path = platformdirs.user_config_path(appname=PROGRAM_NAME)
|
||||
PROGRAM_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
ICS_FILE = Path(PROGRAM_DATA_DIR / "meetup.ics")
|
||||
|
||||
__all__ = ["prompt", "PROGRAM_DATA_DIR", "PROGRAM_NAME", "errors", "config"]
|
||||
|
||||
__all__ = [
|
||||
"prompt",
|
||||
"PROGRAM_DATA_DIR",
|
||||
"PROGRAM_NAME",
|
||||
"errors",
|
||||
"config",
|
||||
"ICS_FILE",
|
||||
"date",
|
||||
]
|
||||
|
||||
@@ -7,13 +7,26 @@ from . import PROGRAM_DATA_DIR, PROGRAM_NAME
|
||||
|
||||
logger = logging.getLogger("config")
|
||||
_config: dict = {
|
||||
"ics_url": "",
|
||||
|
||||
"locale": "en",
|
||||
"date_format": "%Y-%m-%d %H:%M:%S",
|
||||
"time_format": "%H:%M",
|
||||
"time_format_full_hour": "%H",
|
||||
|
||||
"weekday_translations": {},
|
||||
"months_translations": {},
|
||||
|
||||
"active_feeds": [
|
||||
"mastodon",
|
||||
"twitter",
|
||||
],
|
||||
"mastodon": {},
|
||||
"twitter": {},
|
||||
"lemmy": {},
|
||||
|
||||
"feed": {
|
||||
"mastodon": {},
|
||||
"twitter": {},
|
||||
"lemmy": {},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -38,16 +51,33 @@ def write():
|
||||
|
||||
def get_field(__name: str, default=None):
|
||||
global _config
|
||||
return _config.get(__name, default)
|
||||
|
||||
_parts = __name.split(".")
|
||||
|
||||
_field_container = _config
|
||||
for _part in _parts[:-1]:
|
||||
_field_container = _field_container.get(_part, {})
|
||||
|
||||
return _field_container.get(_parts[-1], default)
|
||||
|
||||
|
||||
def set_field(__name: str, __value, update_dict=False):
|
||||
global _config
|
||||
|
||||
_parts = __name.split(".")
|
||||
|
||||
_field_container = _config
|
||||
for _part in _parts[:-1]:
|
||||
if _part not in _field_container:
|
||||
_field_container[_part] = {}
|
||||
|
||||
_field_container = _field_container[_part]
|
||||
|
||||
|
||||
if update_dict and isinstance(__value, dict) and isinstance(_config.get(__name), dict):
|
||||
_config[__name].update(__value)
|
||||
_field_container[_parts[-1]].update(__value)
|
||||
else:
|
||||
_config[__name] = __value
|
||||
_field_container[_parts[-1]] = __value
|
||||
|
||||
write()
|
||||
|
||||
|
||||
40
publish_meetups/utils/date.py
Normal file
40
publish_meetups/utils/date.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from datetime import datetime
|
||||
from ics import Calendar, Event
|
||||
|
||||
from . import config
|
||||
|
||||
|
||||
def to_date_string(date_time: datetime) -> str:
|
||||
date_format = config.get_field("date_format", "%Y-%m-%d %H:%M:%S")
|
||||
|
||||
weekday_translations = config.get_field("weekday_translations", {})
|
||||
months_translations = config.get_field("months_translations", {})
|
||||
|
||||
parsed = date_time.strftime(date_format)
|
||||
|
||||
for key, value in weekday_translations.items():
|
||||
parsed = parsed.replace(key, value)
|
||||
|
||||
for key, value in months_translations.items():
|
||||
parsed = parsed.replace(key, value)
|
||||
|
||||
return parsed.strip()
|
||||
|
||||
|
||||
def to_time_string(date_time: datetime) -> str:
|
||||
time_format = config.get_field("time_format", "%H:%M")
|
||||
time_format_full_hour = config.get_field("time_format_full_hour", time_format)
|
||||
|
||||
parsed = date_time.strftime(time_format)
|
||||
if int(date_time.minute) == 0:
|
||||
parsed = date_time.strftime(time_format_full_hour)
|
||||
|
||||
return parsed.strip()
|
||||
|
||||
|
||||
def event_formatting_values(event: Event, locale="en") -> dict:
|
||||
return {
|
||||
"time": to_time_string(event.begin),
|
||||
"date": to_date_string(event.begin),
|
||||
"date_humanized": event.begin.humanize(locale=locale),
|
||||
}
|
||||
@@ -37,6 +37,8 @@ dependencies = [
|
||||
"twikit~=1.1.12",
|
||||
"toml~=0.10.2",
|
||||
"platformdirs~=3.2.0",
|
||||
"ics~=0.7.2",
|
||||
"requests~=2.31.0"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
Reference in New Issue
Block a user