Gracefully shut down each component before exiting

This commit is contained in:
Elara 2023-03-26 14:34:29 -07:00
parent 5ce83902a4
commit f7ac77273e
12 changed files with 230 additions and 130 deletions

View File

@ -10,7 +10,7 @@ import (
"go.arsenm.dev/logger/log"
)
func initCallNotifs(ctx context.Context, dev *infinitime.Device) error {
func initCallNotifs(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to system bus. This connection is for method calls.
conn, err := utils.NewSystemBusConn(ctx)
if err != nil {
@ -53,9 +53,12 @@ func initCallNotifs(ctx context.Context, dev *infinitime.Device) error {
var respHandlerOnce sync.Once
var callObj dbus.BusObject
wg.Add(1)
go func() {
// For every message received
for event := range callCh {
defer wg.Done("callNotifs")
for {
select {
case event := <-callCh:
// Get path to call object
callPath := event.Body[0].(dbus.ObjectPath)
// Get call object
@ -96,6 +99,9 @@ func initCallNotifs(ctx context.Context, dev *infinitime.Device) error {
}
}
})
case <-ctx.Done():
return
}
}
}()

11
fuse.go
View File

@ -11,7 +11,7 @@ import (
"go.arsenm.dev/logger/log"
)
func startFUSE(ctx context.Context, dev *infinitime.Device) error {
func startFUSE(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// This is where we'll mount the FS
err := os.MkdirAll(k.String("fuse.mountpoint"), 0o755)
if err != nil && !os.IsExist(err) {
@ -55,7 +55,12 @@ func startFUSE(ctx context.Context, dev *infinitime.Device) error {
return err
}
// Wait until unmount before exiting
go server.Serve()
wg.Add(1)
go func() {
defer wg.Done("fuse")
<-ctx.Done()
server.Unmount()
}()
return nil
}

4
go.mod
View File

@ -4,6 +4,8 @@ go 1.18
replace fyne.io/x/fyne => github.com/metal3d/fyne-x v0.0.0-20220508095732-177117e583fb
replace go.arsenm.dev/drpc => /home/elara/Code/drpc
require (
fyne.io/fyne/v2 v2.3.0
fyne.io/x/fyne v0.0.0-20220107050838-c4a1de51d4ce
@ -74,7 +76,7 @@ require (
golang.org/x/mobile v0.0.0-20221110043201-43a038452099 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/js/dom v0.0.0-20221001195520-26252dedbe70 // indirect

3
go.sum
View File

@ -341,6 +341,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/lucor/goinfo v0.0.0-20210802170112-c078a2b0f08b/go.mod h1:PRq09yoB+Q2OJReAmwzKivcYyremnibWGbK7WfftHzc=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
@ -756,6 +757,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

71
main.go
View File

@ -26,6 +26,8 @@ import (
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
@ -74,21 +76,7 @@ func main() {
LogLevel: level,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
go func() {
<-sigCh
cancel()
time.Sleep(200 * time.Millisecond)
os.Exit(0)
}()
signal.Notify(
sigCh,
syscall.SIGINT,
syscall.SIGTERM,
)
ctx := context.Background()
// Connect to InfiniTime with default options
dev, err := infinitime.Connect(ctx, opts)
@ -145,57 +133,88 @@ func main() {
log.Error("Error setting current time on connected InfiniTime").Err(err).Send()
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
sig := <-sigCh
log.Warn("Signal received, shutting down").Stringer("signal", sig).Send()
cancel()
}()
wg := WaitGroup{&sync.WaitGroup{}}
// Initialize music controls
err = initMusicCtrl(ctx, dev)
err = initMusicCtrl(ctx, wg, dev)
if err != nil {
log.Error("Error initializing music control").Err(err).Send()
}
// Start control socket
err = initCallNotifs(ctx, dev)
err = initCallNotifs(ctx, wg, dev)
if err != nil {
log.Error("Error initializing call notifications").Err(err).Send()
}
// Initialize notification relay
err = initNotifRelay(ctx, dev)
err = initNotifRelay(ctx, wg, dev)
if err != nil {
log.Error("Error initializing notification relay").Err(err).Send()
}
// Initializa weather
err = initWeather(ctx, dev)
err = initWeather(ctx, wg, dev)
if err != nil {
log.Error("Error initializing weather").Err(err).Send()
}
// Initialize metrics collection
err = initMetrics(ctx, dev)
err = initMetrics(ctx, wg, dev)
if err != nil {
log.Error("Error intializing metrics collection").Err(err).Send()
}
// Initialize metrics collection
err = initPureMaps(ctx, dev)
// Initialize puremaps integration
err = initPureMaps(ctx, wg, dev)
if err != nil {
log.Error("Error intializing puremaps integration").Err(err).Send()
}
// Start fuse socket
if k.Bool("fuse.enabled") {
err = startFUSE(ctx, dev)
err = startFUSE(ctx, wg, dev)
if err != nil {
log.Error("Error starting fuse socket").Err(err).Send()
}
}
// Start control socket
err = startSocket(ctx, dev)
err = startSocket(ctx, wg, dev)
if err != nil {
log.Error("Error starting socket").Err(err).Send()
}
// Block forever
select {}
wg.Wait()
}
type x struct {
n int
*sync.WaitGroup
}
func (xy *x) Add(i int) {
xy.n += i
xy.WaitGroup.Add(i)
fmt.Println("add: counter:", xy.n)
}
func (xy *x) Done() {
xy.n -= 1
xy.WaitGroup.Done()
fmt.Println("done: counter:", xy.n)
}
func onReqPasskey() (uint32, error) {

View File

@ -18,7 +18,7 @@ const (
progressProperty = interfaceName + ".progress"
)
func initPureMaps(ctx context.Context, dev *infinitime.Device) error {
func initPureMaps(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to session bus. This connection is for method calls.
conn, err := utils.NewSessionBusConn(ctx)
if err != nil {
@ -59,7 +59,10 @@ func initPureMaps(ctx context.Context, dev *infinitime.Device) error {
}
}
wg.Add(1)
go func() {
defer wg.Done("pureMaps")
signalCh := make(chan *dbus.Message, 10)
monitorConn.Eavesdrop(signalCh)

View File

@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"path/filepath"
"time"
"go.arsenm.dev/infinitime"
@ -11,7 +12,7 @@ import (
_ "modernc.org/sqlite"
)
func initMetrics(ctx context.Context, dev *infinitime.Device) error {
func initMetrics(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// If metrics disabled, return nil
if !k.Bool("metrics.enabled") {
return nil
@ -125,6 +126,13 @@ func initMetrics(ctx context.Context, dev *infinitime.Device) error {
}()
}
wg.Add(1)
go func() {
defer wg.Done("metrics")
<-ctx.Done()
db.Close()
}()
log.Info("Initialized metrics collection").Send()
return nil

View File

@ -21,13 +21,14 @@ package main
import (
"context"
"go.arsenm.dev/infinitime"
"go.arsenm.dev/itd/mpris"
"go.arsenm.dev/itd/translit"
"go.arsenm.dev/logger/log"
)
func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error {
func initMusicCtrl(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
mpris.Init(ctx)
maps := k.Strings("notifs.translit.use")
@ -54,9 +55,14 @@ func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error {
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done("musicCtrl")
// For every music event received
for musicEvt := range musicEvtCh {
for {
select {
case musicEvt := <-musicEvtCh:
// Perform appropriate action based on event
switch musicEvt {
case infinitime.MusicEventPlay:
@ -72,6 +78,9 @@ func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error {
case infinitime.MusicEventVolDown:
mpris.VolDown(uint(k.Int("music.vol.interval")))
}
case <-ctx.Done():
return
}
}
}()

View File

@ -29,7 +29,7 @@ import (
"go.arsenm.dev/logger/log"
)
func initNotifRelay(ctx context.Context, dev *infinitime.Device) error {
func initNotifRelay(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Connect to dbus session bus
bus, err := utils.NewSessionBusConn(ctx)
if err != nil {
@ -54,9 +54,13 @@ func initNotifRelay(ctx context.Context, dev *infinitime.Device) error {
// Send events to channel
bus.Eavesdrop(notifCh)
wg.Add(1)
go func() {
defer wg.Done("notifRelay")
// For every event sent to channel
for v := range notifCh {
for {
select {
case v := <-notifCh:
// If firmware is updating, skip
if firmwareUpdating {
continue
@ -91,6 +95,10 @@ func initNotifRelay(ctx context.Context, dev *infinitime.Device) error {
}
dev.Notify(sender, msg)
case <-ctx.Done():
bus.Close()
return
}
}
}()

View File

@ -25,6 +25,7 @@ import (
"net"
"os"
"path/filepath"
"time"
"go.arsenm.dev/drpc/muxserver"
@ -41,7 +42,7 @@ var (
ErrDFUInvalidUpgType = errors.New("invalid upgrade type")
)
func startSocket(ctx context.Context, dev *infinitime.Device) error {
func startSocket(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
// Make socket directory if non-existant
err := os.MkdirAll(filepath.Dir(k.String("socket.path")), 0o755)
if err != nil {
@ -77,10 +78,13 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error {
return err
}
go muxserver.New(mux).Serve(ctx, ln)
log.Info("Starting control socket").Str("path", k.String("socket.path")).Send()
// Log socket start
log.Info("Started control socket").Str("path", k.String("socket.path")).Send()
wg.Add(1)
go func() {
defer wg.Done("socket")
muxserver.New(mux).Serve(ctx, ln)
}()
return nil
}

16
waitgroup.go Normal file
View File

@ -0,0 +1,16 @@
package main
import (
"sync"
"go.arsenm.dev/logger/log"
)
type WaitGroup struct {
*sync.WaitGroup
}
func (wg WaitGroup) Done(c string) {
log.Info("Component stopped").Str("name", c).Send()
wg.WaitGroup.Done()
}

View File

@ -9,6 +9,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"go.arsenm.dev/infinitime"
@ -61,7 +62,14 @@ type OSMData []struct {
var sendWeatherCh = make(chan struct{}, 1)
func initWeather(ctx context.Context, dev *infinitime.Device) error {
func sleepCtx(ctx context.Context, d time.Duration) {
select {
case <-time.After(d):
case <-ctx.Done():
}
}
func initWeather(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error {
if !k.Bool("weather.enabled") {
return nil
}
@ -74,14 +82,21 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error {
timer := time.NewTimer(time.Hour)
wg.Add(1)
go func() {
defer wg.Done("weather")
for {
_, ok := <-ctx.Done()
if !ok {
return
}
// Attempt to get weather
data, err := getWeather(ctx, lat, lon)
if err != nil {
log.Warn("Error getting weather data").Err(err).Send()
// Wait 15 minutes before retrying
time.Sleep(15 * time.Minute)
sleepCtx(ctx, 15*time.Minute)
continue
}
@ -174,6 +189,8 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error {
select {
case <-timer.C:
case <-sendWeatherCh:
case <-ctx.Done():
return
}
}
}()