From f7ac77273e0dc4b68f534fbe96e922eef225f7d7 Mon Sep 17 00:00:00 2001 From: Elara Musayelyan Date: Sun, 26 Mar 2023 14:34:29 -0700 Subject: [PATCH] Gracefully shut down each component before exiting --- calls.go | 88 ++++++++++++++++++++++++++++------------------------ fuse.go | 11 +++++-- go.mod | 4 ++- go.sum | 3 ++ main.go | 71 ++++++++++++++++++++++++++---------------- maps.go | 5 ++- metrics.go | 10 +++++- music.go | 41 ++++++++++++++---------- notifs.go | 78 +++++++++++++++++++++++++--------------------- socket.go | 12 ++++--- waitgroup.go | 16 ++++++++++ weather.go | 21 +++++++++++-- 12 files changed, 230 insertions(+), 130 deletions(-) create mode 100644 waitgroup.go diff --git a/calls.go b/calls.go index 51b230d..6f55b66 100644 --- a/calls.go +++ b/calls.go @@ -10,7 +10,7 @@ import ( "go.arsenm.dev/logger/log" ) -func initCallNotifs(ctx context.Context, dev *infinitime.Device) error { +func initCallNotifs(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // Connect to system bus. This connection is for method calls. conn, err := utils.NewSystemBusConn(ctx) if err != nil { @@ -53,49 +53,55 @@ func initCallNotifs(ctx context.Context, dev *infinitime.Device) error { var respHandlerOnce sync.Once var callObj dbus.BusObject + wg.Add(1) go func() { - // For every message received - for event := range callCh { - // Get path to call object - callPath := event.Body[0].(dbus.ObjectPath) - // Get call object - callObj = conn.Object("org.freedesktop.ModemManager1", callPath) + defer wg.Done("callNotifs") + for { + select { + case event := <-callCh: + // Get path to call object + callPath := event.Body[0].(dbus.ObjectPath) + // Get call object + callObj = conn.Object("org.freedesktop.ModemManager1", callPath) - // Get phone number from call object using method call connection - phoneNum, err := getPhoneNum(conn, callObj) - if err != nil { - log.Error("Error getting phone number").Err(err).Send() - continue - } - - // Send call notification to InfiniTime - resCh, err := dev.NotifyCall(phoneNum) - if err != nil { - continue - } - - go respHandlerOnce.Do(func() { - // Wait for PineTime response - for res := range resCh { - switch res { - case infinitime.CallStatusAccepted: - // Attempt to accept call - err = acceptCall(ctx, conn, callObj) - if err != nil { - log.Warn("Error accepting call").Err(err).Send() - } - case infinitime.CallStatusDeclined: - // Attempt to decline call - err = declineCall(ctx, conn, callObj) - if err != nil { - log.Warn("Error declining call").Err(err).Send() - } - case infinitime.CallStatusMuted: - // Warn about unimplemented muting - log.Warn("Muting calls is not implemented").Send() - } + // Get phone number from call object using method call connection + phoneNum, err := getPhoneNum(conn, callObj) + if err != nil { + log.Error("Error getting phone number").Err(err).Send() + continue } - }) + + // Send call notification to InfiniTime + resCh, err := dev.NotifyCall(phoneNum) + if err != nil { + continue + } + + go respHandlerOnce.Do(func() { + // Wait for PineTime response + for res := range resCh { + switch res { + case infinitime.CallStatusAccepted: + // Attempt to accept call + err = acceptCall(ctx, conn, callObj) + if err != nil { + log.Warn("Error accepting call").Err(err).Send() + } + case infinitime.CallStatusDeclined: + // Attempt to decline call + err = declineCall(ctx, conn, callObj) + if err != nil { + log.Warn("Error declining call").Err(err).Send() + } + case infinitime.CallStatusMuted: + // Warn about unimplemented muting + log.Warn("Muting calls is not implemented").Send() + } + } + }) + case <-ctx.Done(): + return + } } }() diff --git a/fuse.go b/fuse.go index 1dc7a6e..b15dac3 100644 --- a/fuse.go +++ b/fuse.go @@ -11,7 +11,7 @@ import ( "go.arsenm.dev/logger/log" ) -func startFUSE(ctx context.Context, dev *infinitime.Device) error { +func startFUSE(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // This is where we'll mount the FS err := os.MkdirAll(k.String("fuse.mountpoint"), 0o755) if err != nil && !os.IsExist(err) { @@ -55,7 +55,12 @@ func startFUSE(ctx context.Context, dev *infinitime.Device) error { return err } - // Wait until unmount before exiting - go server.Serve() + wg.Add(1) + go func() { + defer wg.Done("fuse") + <-ctx.Done() + server.Unmount() + }() + return nil } diff --git a/go.mod b/go.mod index 668793a..6148dff 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.18 replace fyne.io/x/fyne => github.com/metal3d/fyne-x v0.0.0-20220508095732-177117e583fb +replace go.arsenm.dev/drpc => /home/elara/Code/drpc + require ( fyne.io/fyne/v2 v2.3.0 fyne.io/x/fyne v0.0.0-20220107050838-c4a1de51d4ce @@ -74,7 +76,7 @@ require ( golang.org/x/mobile v0.0.0-20221110043201-43a038452099 // indirect golang.org/x/mod v0.7.0 // indirect golang.org/x/net v0.4.0 // indirect - golang.org/x/sys v0.3.0 // indirect + golang.org/x/sys v0.6.0 // indirect golang.org/x/tools v0.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect honnef.co/go/js/dom v0.0.0-20221001195520-26252dedbe70 // indirect diff --git a/go.sum b/go.sum index 347e737..6575588 100644 --- a/go.sum +++ b/go.sum @@ -341,6 +341,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/lucor/goinfo v0.0.0-20210802170112-c078a2b0f08b/go.mod h1:PRq09yoB+Q2OJReAmwzKivcYyremnibWGbK7WfftHzc= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= @@ -756,6 +757,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/main.go b/main.go index 0accec8..2803d99 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,8 @@ import ( "os" "os/signal" "strconv" + "sync" + "syscall" "time" @@ -74,21 +76,7 @@ func main() { LogLevel: level, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigCh := make(chan os.Signal, 1) - go func() { - <-sigCh - cancel() - time.Sleep(200 * time.Millisecond) - os.Exit(0) - }() - signal.Notify( - sigCh, - syscall.SIGINT, - syscall.SIGTERM, - ) + ctx := context.Background() // Connect to InfiniTime with default options dev, err := infinitime.Connect(ctx, opts) @@ -145,57 +133,88 @@ func main() { log.Error("Error setting current time on connected InfiniTime").Err(err).Send() } + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + go func() { + sig := <-sigCh + log.Warn("Signal received, shutting down").Stringer("signal", sig).Send() + cancel() + }() + + wg := WaitGroup{&sync.WaitGroup{}} + // Initialize music controls - err = initMusicCtrl(ctx, dev) + err = initMusicCtrl(ctx, wg, dev) if err != nil { log.Error("Error initializing music control").Err(err).Send() } // Start control socket - err = initCallNotifs(ctx, dev) + err = initCallNotifs(ctx, wg, dev) if err != nil { log.Error("Error initializing call notifications").Err(err).Send() } // Initialize notification relay - err = initNotifRelay(ctx, dev) + err = initNotifRelay(ctx, wg, dev) if err != nil { log.Error("Error initializing notification relay").Err(err).Send() } // Initializa weather - err = initWeather(ctx, dev) + err = initWeather(ctx, wg, dev) if err != nil { log.Error("Error initializing weather").Err(err).Send() } // Initialize metrics collection - err = initMetrics(ctx, dev) + err = initMetrics(ctx, wg, dev) if err != nil { log.Error("Error intializing metrics collection").Err(err).Send() } - // Initialize metrics collection - err = initPureMaps(ctx, dev) + // Initialize puremaps integration + err = initPureMaps(ctx, wg, dev) if err != nil { log.Error("Error intializing puremaps integration").Err(err).Send() } // Start fuse socket if k.Bool("fuse.enabled") { - err = startFUSE(ctx, dev) + err = startFUSE(ctx, wg, dev) if err != nil { log.Error("Error starting fuse socket").Err(err).Send() } } // Start control socket - err = startSocket(ctx, dev) + err = startSocket(ctx, wg, dev) if err != nil { log.Error("Error starting socket").Err(err).Send() } - // Block forever - select {} + + wg.Wait() +} + +type x struct { + n int + *sync.WaitGroup +} + +func (xy *x) Add(i int) { + xy.n += i + xy.WaitGroup.Add(i) + fmt.Println("add: counter:", xy.n) +} + +func (xy *x) Done() { + xy.n -= 1 + xy.WaitGroup.Done() + fmt.Println("done: counter:", xy.n) } func onReqPasskey() (uint32, error) { diff --git a/maps.go b/maps.go index 774f18b..4178343 100644 --- a/maps.go +++ b/maps.go @@ -18,7 +18,7 @@ const ( progressProperty = interfaceName + ".progress" ) -func initPureMaps(ctx context.Context, dev *infinitime.Device) error { +func initPureMaps(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // Connect to session bus. This connection is for method calls. conn, err := utils.NewSessionBusConn(ctx) if err != nil { @@ -59,7 +59,10 @@ func initPureMaps(ctx context.Context, dev *infinitime.Device) error { } } + wg.Add(1) go func() { + defer wg.Done("pureMaps") + signalCh := make(chan *dbus.Message, 10) monitorConn.Eavesdrop(signalCh) diff --git a/metrics.go b/metrics.go index cb30133..42aad6f 100644 --- a/metrics.go +++ b/metrics.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "path/filepath" + "time" "go.arsenm.dev/infinitime" @@ -11,7 +12,7 @@ import ( _ "modernc.org/sqlite" ) -func initMetrics(ctx context.Context, dev *infinitime.Device) error { +func initMetrics(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // If metrics disabled, return nil if !k.Bool("metrics.enabled") { return nil @@ -125,6 +126,13 @@ func initMetrics(ctx context.Context, dev *infinitime.Device) error { }() } + wg.Add(1) + go func() { + defer wg.Done("metrics") + <-ctx.Done() + db.Close() + }() + log.Info("Initialized metrics collection").Send() return nil diff --git a/music.go b/music.go index c3f4a1a..dcf3a50 100644 --- a/music.go +++ b/music.go @@ -20,6 +20,7 @@ package main import ( "context" + "go.arsenm.dev/infinitime" "go.arsenm.dev/itd/mpris" @@ -27,7 +28,7 @@ import ( "go.arsenm.dev/logger/log" ) -func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error { +func initMusicCtrl(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { mpris.Init(ctx) maps := k.Strings("notifs.translit.use") @@ -54,23 +55,31 @@ func initMusicCtrl(ctx context.Context, dev *infinitime.Device) error { if err != nil { return err } + + wg.Add(1) go func() { + defer wg.Done("musicCtrl") // For every music event received - for musicEvt := range musicEvtCh { - // Perform appropriate action based on event - switch musicEvt { - case infinitime.MusicEventPlay: - mpris.Play() - case infinitime.MusicEventPause: - mpris.Pause() - case infinitime.MusicEventNext: - mpris.Next() - case infinitime.MusicEventPrev: - mpris.Prev() - case infinitime.MusicEventVolUp: - mpris.VolUp(uint(k.Int("music.vol.interval"))) - case infinitime.MusicEventVolDown: - mpris.VolDown(uint(k.Int("music.vol.interval"))) + for { + select { + case musicEvt := <-musicEvtCh: + // Perform appropriate action based on event + switch musicEvt { + case infinitime.MusicEventPlay: + mpris.Play() + case infinitime.MusicEventPause: + mpris.Pause() + case infinitime.MusicEventNext: + mpris.Next() + case infinitime.MusicEventPrev: + mpris.Prev() + case infinitime.MusicEventVolUp: + mpris.VolUp(uint(k.Int("music.vol.interval"))) + case infinitime.MusicEventVolDown: + mpris.VolDown(uint(k.Int("music.vol.interval"))) + } + case <-ctx.Done(): + return } } }() diff --git a/notifs.go b/notifs.go index e62254d..aaf33c3 100644 --- a/notifs.go +++ b/notifs.go @@ -29,7 +29,7 @@ import ( "go.arsenm.dev/logger/log" ) -func initNotifRelay(ctx context.Context, dev *infinitime.Device) error { +func initNotifRelay(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // Connect to dbus session bus bus, err := utils.NewSessionBusConn(ctx) if err != nil { @@ -54,43 +54,51 @@ func initNotifRelay(ctx context.Context, dev *infinitime.Device) error { // Send events to channel bus.Eavesdrop(notifCh) + wg.Add(1) go func() { + defer wg.Done("notifRelay") // For every event sent to channel - for v := range notifCh { - // If firmware is updating, skip - if firmwareUpdating { - continue + for { + select { + case v := <-notifCh: + // If firmware is updating, skip + if firmwareUpdating { + continue + } + + // If body does not contain 5 elements, skip + if len(v.Body) < 5 { + continue + } + + // Get requred fields + sender, summary, body := v.Body[0].(string), v.Body[3].(string), v.Body[4].(string) + + // If fields are ignored in config, skip + if ignored(sender, summary, body) { + continue + } + + maps := k.Strings("notifs.translit.use") + translit.Transliterators["custom"] = translit.Map(k.Strings("notifs.translit.custom")) + sender = translit.Transliterate(sender, maps...) + summary = translit.Transliterate(summary, maps...) + body = translit.Transliterate(body, maps...) + + var msg string + // If summary does not exist, set message to body. + // If it does, set message to summary, two newlines, and then body + if summary == "" { + msg = body + } else { + msg = fmt.Sprintf("%s\n\n%s", summary, body) + } + + dev.Notify(sender, msg) + case <-ctx.Done(): + bus.Close() + return } - - // If body does not contain 5 elements, skip - if len(v.Body) < 5 { - continue - } - - // Get requred fields - sender, summary, body := v.Body[0].(string), v.Body[3].(string), v.Body[4].(string) - - // If fields are ignored in config, skip - if ignored(sender, summary, body) { - continue - } - - maps := k.Strings("notifs.translit.use") - translit.Transliterators["custom"] = translit.Map(k.Strings("notifs.translit.custom")) - sender = translit.Transliterate(sender, maps...) - summary = translit.Transliterate(summary, maps...) - body = translit.Transliterate(body, maps...) - - var msg string - // If summary does not exist, set message to body. - // If it does, set message to summary, two newlines, and then body - if summary == "" { - msg = body - } else { - msg = fmt.Sprintf("%s\n\n%s", summary, body) - } - - dev.Notify(sender, msg) } }() diff --git a/socket.go b/socket.go index f31338f..6d59244 100644 --- a/socket.go +++ b/socket.go @@ -25,6 +25,7 @@ import ( "net" "os" "path/filepath" + "time" "go.arsenm.dev/drpc/muxserver" @@ -41,7 +42,7 @@ var ( ErrDFUInvalidUpgType = errors.New("invalid upgrade type") ) -func startSocket(ctx context.Context, dev *infinitime.Device) error { +func startSocket(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { // Make socket directory if non-existant err := os.MkdirAll(filepath.Dir(k.String("socket.path")), 0o755) if err != nil { @@ -77,10 +78,13 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error { return err } - go muxserver.New(mux).Serve(ctx, ln) + log.Info("Starting control socket").Str("path", k.String("socket.path")).Send() - // Log socket start - log.Info("Started control socket").Str("path", k.String("socket.path")).Send() + wg.Add(1) + go func() { + defer wg.Done("socket") + muxserver.New(mux).Serve(ctx, ln) + }() return nil } diff --git a/waitgroup.go b/waitgroup.go new file mode 100644 index 0000000..8d9f734 --- /dev/null +++ b/waitgroup.go @@ -0,0 +1,16 @@ +package main + +import ( + "sync" + + "go.arsenm.dev/logger/log" +) + +type WaitGroup struct { + *sync.WaitGroup +} + +func (wg WaitGroup) Done(c string) { + log.Info("Component stopped").Str("name", c).Send() + wg.WaitGroup.Done() +} diff --git a/weather.go b/weather.go index b933434..9825c26 100644 --- a/weather.go +++ b/weather.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "time" "go.arsenm.dev/infinitime" @@ -61,7 +62,14 @@ type OSMData []struct { var sendWeatherCh = make(chan struct{}, 1) -func initWeather(ctx context.Context, dev *infinitime.Device) error { +func sleepCtx(ctx context.Context, d time.Duration) { + select { + case <-time.After(d): + case <-ctx.Done(): + } +} + +func initWeather(ctx context.Context, wg WaitGroup, dev *infinitime.Device) error { if !k.Bool("weather.enabled") { return nil } @@ -74,14 +82,21 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error { timer := time.NewTimer(time.Hour) + wg.Add(1) go func() { + defer wg.Done("weather") for { + _, ok := <-ctx.Done() + if !ok { + return + } + // Attempt to get weather data, err := getWeather(ctx, lat, lon) if err != nil { log.Warn("Error getting weather data").Err(err).Send() // Wait 15 minutes before retrying - time.Sleep(15 * time.Minute) + sleepCtx(ctx, 15*time.Minute) continue } @@ -174,6 +189,8 @@ func initWeather(ctx context.Context, dev *infinitime.Device) error { select { case <-timer.C: case <-sendWeatherCh: + case <-ctx.Done(): + return } } }()