Gracefully shut down each component before exiting

This commit is contained in:
Elara 2023-03-26 14:34:29 -07:00
parent 947ab7fbcb
commit 6113ac019e
12 changed files with 230 additions and 130 deletions

View File

@ -10,7 +10,7 @@ import (
"go.arsenm.dev/logger/log"
)
func initCallNotifs(ctx context.Context, dev *infinitime.Device) error {
func initCallNotifs(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to system bus. This connection is for method calls.
conn, err := utils.NewSystemBusConn(ctx)
if err != nil {
@ -53,49 +53,55 @@ func initCallNotifs(ctx context.Context, dev *infinitime.Device) error {
var respHandlerOnce sync.Once
var callObj dbus.BusObject
wg.Add(1)
go func() {
// For every message received
for event := range callCh {
// Get path to call object
callPath := event.Body[0].(dbus.ObjectPath)
// Get call object
callObj = conn.Object("org.freedesktop.ModemManager1", callPath)
defer wg.Done("callNotifs")
for {
select {
case event := <-callCh:
// Get path to call object
callPath := event.Body[0].(dbus.ObjectPath)
// Get call object
callObj = conn.Object("org.freedesktop.ModemManager1", callPath)
// Get phone number from call object using method call connection
phoneNum, err := getPhoneNum(conn, callObj)
if err != nil {
log.Error("Error getting phone number").Err(err).Send()
continue
}
// Send call notification to InfiniTime
resCh, err := dev.NotifyCall(phoneNum)
if err != nil {
continue
}
go respHandlerOnce.Do(func() {
// Wait for PineTime response
for res := range resCh {
switch res {
case infinitime.CallStatusAccepted:
// Attempt to accept call
err = acceptCall(ctx, conn, callObj)
if err != nil {
log.Warn("Error accepting call").Err(err).Send()
}
case infinitime.CallStatusDeclined:
// Attempt to decline call
err = declineCall(ctx, conn, callObj)
if err != nil {
log.Warn("Error declining call").Err(err).Send()
}
case infinitime.CallStatusMuted:
// Warn about unimplemented muting
log.Warn("Muting calls is not implemented").Send()
}
// Get phone number from call object using method call connection
phoneNum, err := getPhoneNum(conn, callObj)
if err != nil {
log.Error("Error getting phone number").Err(err).Send()
continue
}
})
// Send call notification to InfiniTime
resCh, err := dev.NotifyCall(phoneNum)
if err != nil {
continue
}
go respHandlerOnce.Do(func() {
// Wait for PineTime response
for res := range resCh {
switch res {
case infinitime.CallStatusAccepted:
// Attempt to accept call
err = acceptCall(ctx, conn, callObj)
if err != nil {
log.Warn("Error accepting call").Err(err).Send()
}
case infinitime.CallStatusDeclined:
// Attempt to decline call
err = declineCall(ctx, conn, callObj)
if err != nil {
log.Warn("Error declining call").Err(err).Send()
}
case infinitime.CallStatusMuted:
// Warn about unimplemented muting
log.Warn("Muting calls is not implemented").Send()
}
}
})
case <-ctx.Done():
return
}
}
}()

11
fuse.go
View File

@ -11,7 +11,7 @@ import (
"go.arsenm.dev/logger/log"
)
func startFUSE(ctx context.Context, dev *infinitime.Device) error {
func startFUSE(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// This is where we'll mount the FS
err := os.MkdirAll(k.String("fuse.mountpoint"), 0o755)
if err != nil && !os.IsExist(err) {
@ -55,7 +55,12 @@ func startFUSE(ctx context.Context, dev *infinitime.Device) error {
return err
}
// Wait until unmount before exiting
go server.Serve()
wg.Add(1)
go func() {
defer wg.Done("fuse")
<-ctx.Done()
server.Unmount()
}()
return nil
}

4
go.mod
View File

@ -4,6 +4,8 @@ go 1.18
replace fyne.io/x/fyne => github.com/metal3d/fyne-x v0.0.0-20220508095732-177117e583fb
replace go.arsenm.dev/drpc => /home/elara/Code/drpc
require (
fyne.io/fyne/v2 v2.3.0
fyne.io/x/fyne v0.0.0-20220107050838-c4a1de51d4ce
@ -74,7 +76,7 @@ require (
golang.org/x/mobile v0.0.0-20221110043201-43a038452099 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/js/dom v0.0.0-20221001195520-26252dedbe70 // indirect

3
go.sum
View File

@ -341,6 +341,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lucor/goinfo v0.0.0-20210802170112-c078a2b0f08b/go.mod h1:PRq09yoB+Q2OJReAmwzKivcYyremnibWGbK7WfftHzc=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
@ -756,6 +757,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

71
main.go
View File

@ -26,6 +26,8 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
@ -74,21 +76,7 @@ func main() {
LogLevel: level,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
go func() {
<-sigCh
cancel()
time.Sleep(200 * time.Millisecond)
os.Exit(0)
}()
signal.Notify(
sigCh,
syscall.SIGINT,
syscall.SIGTERM,
)
ctx := context.Background()
// Connect to InfiniTime with default options
dev, err := infinitime.Connect(ctx, opts)
@ -145,57 +133,88 @@ func main() {
log.Error("Error setting current time on connected InfiniTime").Err(err).Send()
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
sig := <-sigCh
log.Warn("Signal received, shutting down").Stringer("signal", sig).Send()
cancel()
}()
wg := WaitGroup{&sync.WaitGroup{}}
// Initialize music controls
err = initMusicCtrl(ctx, dev)
err = initMusicCtrl(ctx, wg, dev)
if err != nil {
log.Error("Error initializing music control").Err(err).Send()
}
// Start control socket
err = initCallNotifs(ctx, dev)
err = initCallNotifs(ctx, wg, dev)
if err != nil {
log.Error("Error initializing call notifications").Err(err).Send()
}
// Initialize notification relay
err = initNotifRelay(ctx, dev)
err = initNotifRelay(ctx, wg, dev)
if err != nil {
log.Error("Error initializing notification relay").Err(err).Send()
}
// Initializa weather
err = initWeather(ctx, dev)
err = initWeather(ctx, wg, dev)
if err != nil {
log.Error("Error initializing weather").Err(err).Send()
}
// Initialize metrics collection
err = initMetrics(ctx, dev)
err = initMetrics(ctx, wg, dev)
if err != nil {
log.Error("Error intializing metrics collection").Err(err).Send()
}
// Initialize metrics collection
err = initPureMaps(ctx, dev)
// Initialize puremaps integration
err = initPureMaps(ctx, wg, dev)
if err != nil {
log.Error("Error intializing puremaps integration").Err(err).Send()
}
// Start fuse socket
if k.Bool("fuse.enabled") {
err = startFUSE(ctx, dev)
err = startFUSE(ctx, wg, dev)
if err != nil {
log.Error("Error starting fuse socket").Err(err).Send()
}
}
// Start control socket
err = startSocket(ctx, dev)
err = startSocket(ctx, wg, dev)
if err != nil {
log.Error("Error starting socket").Err(err).Send()
}
// Block forever
select {}
wg.Wait()
}
type x struct {
n int
*sync.WaitGroup
}
func (xy *x) Add(i int) {
xy.n += i
xy.WaitGroup.Add(i)
fmt.Println("add: counter:", xy.n)
}
func (xy *x) Done() {
xy.n -= 1
xy.WaitGroup.Done()
fmt.Println("done: counter:", xy.n)
}
func onReqPasskey() (uint32, error) {

View File

@ -18,7 +18,7 @@ const (
progressProperty = interfaceName + ".progress"
)
func initPureMaps(ctx context.Context, dev *infinitime.Device) error {
func initPureMaps(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to session bus. This connection is for method calls.
conn, err := utils.NewSessionBusConn(ctx)
if err != nil {
@ -59,7 +59,10 @@ func initPureMaps(ctx context.Context, dev *infinitime.Device) error {
}
}
wg.Add(1)
go func() {
defer wg.Done("pureMaps")
signalCh := make(chan *dbus.Message, 10)
monitorConn.Eavesdrop(signalCh)

View File

@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"path/filepath"
"time"
"go.arsenm.dev/infinitime"
@ -11,7 +12,7 @@ import (
_ "modernc.org/sqlite"
)
func initMetrics(ctx context.Context, dev *infinitime.Device) error {
func initMetrics(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// If metrics disabled, return nil
if !k.Bool("metrics.enabled") {
return nil
@ -125,6 +126,13 @@ func initMetrics(ctx context.Context, dev *infinitime.Device) error {
}()
}
wg.Add(1)
go func() {
defer wg.Done("metrics")
<-ctx.Done()
db.Close()
}()
log.Info("Initialized metrics collection").Send()
return nil

View File

@ -20,6 +20,7 @@ package main
import (
"context"
"go.arsenm.dev/infinitime"
"go.arsenm.dev/itd/mpris"
@ -27,7 +28,7 @@ import (
"go.arsenm.dev/logger/log"
)
func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error {
func initMusicCtrl(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
mpris.Init(ctx)
maps := k.Strings("notifs.translit.use")
@ -54,23 +55,31 @@ func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error {
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done("musicCtrl")
// For every music event received
for musicEvt := range musicEvtCh {
// Perform appropriate action based on event
switch musicEvt {
case infinitime.MusicEventPlay:
mpris.Play()
case infinitime.MusicEventPause:
mpris.Pause()
case infinitime.MusicEventNext:
mpris.Next()
case infinitime.MusicEventPrev:
mpris.Prev()
case infinitime.MusicEventVolUp:
mpris.VolUp(uint(k.Int("music.vol.interval")))
case infinitime.MusicEventVolDown:
mpris.VolDown(uint(k.Int("music.vol.interval")))
for {
select {
case musicEvt := <-musicEvtCh:
// Perform appropriate action based on event
switch musicEvt {
case infinitime.MusicEventPlay:
mpris.Play()
case infinitime.MusicEventPause:
mpris.Pause()
case infinitime.MusicEventNext:
mpris.Next()
case infinitime.MusicEventPrev:
mpris.Prev()
case infinitime.MusicEventVolUp:
mpris.VolUp(uint(k.Int("music.vol.interval")))
case infinitime.MusicEventVolDown:
mpris.VolDown(uint(k.Int("music.vol.interval")))
}
case <-ctx.Done():
return
}
}
}()

View File

@ -29,7 +29,7 @@ import (
"go.arsenm.dev/logger/log"
)
func initNotifRelay(ctx context.Context, dev *infinitime.Device) error {
func initNotifRelay(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to dbus session bus
bus, err := utils.NewSessionBusConn(ctx)
if err != nil {
@ -54,43 +54,51 @@ func initNotifRelay(ctx context.Context, dev *infinitime.Device) error {
// Send events to channel
bus.Eavesdrop(notifCh)
wg.Add(1)
go func() {
defer wg.Done("notifRelay")
// For every event sent to channel
for v := range notifCh {
// If firmware is updating, skip
if firmwareUpdating {
continue
for {
select {
case v := <-notifCh:
// If firmware is updating, skip
if firmwareUpdating {
continue
}
// If body does not contain 5 elements, skip
if len(v.Body) < 5 {
continue
}
// Get requred fields
sender, summary, body := v.Body[0].(string), v.Body[3].(string), v.Body[4].(string)
// If fields are ignored in config, skip
if ignored(sender, summary, body) {
continue
}
maps := k.Strings("notifs.translit.use")
translit.Transliterators["custom"] = translit.Map(k.Strings("notifs.translit.custom"))
sender = translit.Transliterate(sender, maps...)
summary = translit.Transliterate(summary, maps...)
body = translit.Transliterate(body, maps...)
var msg string
// If summary does not exist, set message to body.
// If it does, set message to summary, two newlines, and then body
if summary == "" {
msg = body
} else {
msg = fmt.Sprintf("%s\n\n%s", summary, body)
}
dev.Notify(sender, msg)
case <-ctx.Done():
bus.Close()
return
}
// If body does not contain 5 elements, skip
if len(v.Body) < 5 {
continue
}
// Get requred fields
sender, summary, body := v.Body[0].(string), v.Body[3].(string), v.Body[4].(string)
// If fields are ignored in config, skip
if ignored(sender, summary, body) {
continue
}
maps := k.Strings("notifs.translit.use")
translit.Transliterators["custom"] = translit.Map(k.Strings("notifs.translit.custom"))
sender = translit.Transliterate(sender, maps...)
summary = translit.Transliterate(summary, maps...)
body = translit.Transliterate(body, maps...)
var msg string
// If summary does not exist, set message to body.
// If it does, set message to summary, two newlines, and then body
if summary == "" {
msg = body
} else {
msg = fmt.Sprintf("%s\n\n%s", summary, body)
}
dev.Notify(sender, msg)
}
}()

View File

@ -25,6 +25,7 @@ import (
"net"
"os"
"path/filepath"
"time"
"go.arsenm.dev/drpc/muxserver"
@ -41,7 +42,7 @@ var (
ErrDFUInvalidUpgType = errors.New("invalid upgrade type")
)
func startSocket(ctx context.Context, dev *infinitime.Device) error {
func startSocket(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Make socket directory if non-existant
err := os.MkdirAll(filepath.Dir(k.String("socket.path")), 0o755)
if err != nil {
@ -77,10 +78,13 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error {
return err
}
go muxserver.New(mux).Serve(ctx, ln)
log.Info("Starting control socket").Str("path", k.String("socket.path")).Send()
// Log socket start
log.Info("Started control socket").Str("path", k.String("socket.path")).Send()
wg.Add(1)
go func() {
defer wg.Done("socket")
muxserver.New(mux).Serve(ctx, ln)
}()
return nil
}

16
waitgroup.go Normal file
View File

@ -0,0 +1,16 @@
package main
import (
"sync"
"go.arsenm.dev/logger/log"
)
type WaitGroup struct {
*sync.WaitGroup
}
func (wg WaitGroup) Done(c string) {
log.Info("Component stopped").Str("name", c).Send()
wg.WaitGroup.Done()
}

View File

@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"go.arsenm.dev/infinitime"
@ -61,7 +62,14 @@ type OSMData []struct {
var sendWeatherCh = make(chan struct{}, 1)
func initWeather(ctx context.Context, dev *infinitime.Device) error {
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}
func initWeather(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
if !k.Bool("weather.enabled") {
return nil
}
@ -74,14 +82,21 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error {
timer := time.NewTimer(time.Hour)
wg.Add(1)
go func() {
defer wg.Done("weather")
for {
_, ok := <-ctx.Done()
if !ok {
return
}
// Attempt to get weather
data, err := getWeather(ctx, lat, lon)
if err != nil {
log.Warn("Error getting weather data").Err(err).Send()
// Wait 15 minutes before retrying
time.Sleep(15 * time.Minute)
sleepCtx(ctx, 15*time.Minute)
continue
}
@ -174,6 +189,8 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error {
select {
case <-timer.C:
case <-sendWeatherCh:
case <-ctx.Done():
return
}
}
}()