Compare commits

...

2 Commits

Author SHA1 Message Date
9939f724c4 Re-add watch commands to itctl 2022-04-23 18:46:49 -07:00
8dce33f7b1 Enable RPCX gateway 2022-04-23 11:29:16 -07:00
5 changed files with 260 additions and 64 deletions

View File

@ -2,6 +2,8 @@ package main
import ( import (
"os" "os"
"os/signal"
"syscall"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -174,6 +176,49 @@ func main() {
}, },
}, },
}, },
{
Name: "watch",
Usage: "Watch a value for changes",
Subcommands: []*cli.Command{
{
Flags: []cli.Flag{
&cli.BoolFlag{Name: "json"},
&cli.BoolFlag{Name: "shell"},
},
Name: "heart",
Usage: "Watch heart rate value for changes",
Action: watchHeart,
},
{
Flags: []cli.Flag{
&cli.BoolFlag{Name: "json"},
&cli.BoolFlag{Name: "shell"},
},
Name: "steps",
Usage: "Watch step count value for changes",
Action: watchStepCount,
},
{
Flags: []cli.Flag{
&cli.BoolFlag{Name: "json"},
&cli.BoolFlag{Name: "shell"},
},
Name: "motion",
Usage: "Watch motion coordinates for changes",
Action: watchMotion,
},
{
Flags: []cli.Flag{
&cli.BoolFlag{Name: "json"},
&cli.BoolFlag{Name: "shell"},
},
Name: "battery",
Aliases: []string{"batt"},
Usage: "Watch battery level value for changes",
Action: watchBattLevel,
},
},
},
}, },
Before: func(c *cli.Context) error { Before: func(c *cli.Context) error {
newClient, err := api.New(c.String("socket-path")) newClient, err := api.New(c.String("socket-path"))
@ -196,3 +241,17 @@ func main() {
log.Fatal().Err(err).Msg("Error while running app") log.Fatal().Err(err).Msg("Error while running app")
} }
} }
func catchSignal(fn func()) {
sigCh := make(chan os.Signal, 1)
signal.Notify(
sigCh,
syscall.SIGINT,
syscall.SIGTERM,
)
go func() {
<-sigCh
fn()
os.Exit(0)
}()
}

104
cmd/itctl/watch.go Normal file
View File

@ -0,0 +1,104 @@
package main
import (
"encoding/json"
"fmt"
"os"
"github.com/urfave/cli/v2"
)
func watchHeart(c *cli.Context) error {
heartCh, cancel, err := client.WatchHeartRate()
if err != nil {
return err
}
catchSignal(cancel)
for heartRate := range heartCh {
if c.Bool("json") {
json.NewEncoder(os.Stdout).Encode(
map[string]uint8{"heartRate": heartRate},
)
} else if c.Bool("shell") {
fmt.Printf("HEART_RATE=%d\n", heartRate)
} else {
fmt.Println(heartRate, "BPM")
}
}
return nil
}
func watchBattLevel(c *cli.Context) error {
battLevelCh, cancel, err := client.WatchBatteryLevel()
if err != nil {
return err
}
catchSignal(cancel)
for battLevel := range battLevelCh {
if c.Bool("json") {
json.NewEncoder(os.Stdout).Encode(
map[string]uint8{"battLevel": battLevel},
)
} else if c.Bool("shell") {
fmt.Printf("BATTERY_LEVEL=%d\n", battLevel)
} else {
fmt.Printf("%d%%\n", battLevel)
}
}
return nil
}
func watchStepCount(c *cli.Context) error {
stepCountCh, cancel, err := client.WatchStepCount()
if err != nil {
return err
}
catchSignal(cancel)
for stepCount := range stepCountCh {
if c.Bool("json") {
json.NewEncoder(os.Stdout).Encode(
map[string]uint32{"stepCount": stepCount},
)
} else if c.Bool("shell") {
fmt.Printf("STEP_COUNT=%d\n", stepCount)
} else {
fmt.Println(stepCount, "Steps")
}
}
return nil
}
func watchMotion(c *cli.Context) error {
motionCh, cancel, err := client.WatchMotion()
if err != nil {
return err
}
catchSignal(cancel)
for motionVals := range motionCh {
if c.Bool("json") {
json.NewEncoder(os.Stdout).Encode(motionVals)
} else if c.Bool("shell") {
fmt.Printf(
"X=%d\nY=%d\nZ=%d\n",
motionVals.X,
motionVals.Y,
motionVals.Z,
)
} else {
fmt.Println(motionVals)
}
}
return nil
}

4
go.mod
View File

@ -11,11 +11,11 @@ require (
github.com/knadh/koanf v1.4.0 github.com/knadh/koanf v1.4.0
github.com/mattn/go-isatty v0.0.14 github.com/mattn/go-isatty v0.0.14
github.com/mozillazg/go-pinyin v0.19.0 github.com/mozillazg/go-pinyin v0.19.0
github.com/rs/zerolog v1.26.0 github.com/rs/zerolog v1.26.1
github.com/smallnest/rpcx v1.7.4 github.com/smallnest/rpcx v1.7.4
github.com/urfave/cli/v2 v2.3.0 github.com/urfave/cli/v2 v2.3.0
github.com/vmihailenco/msgpack/v5 v5.3.5 github.com/vmihailenco/msgpack/v5 v5.3.5
go.arsenm.dev/infinitime v0.0.0-20220416112421-b7a50271bece go.arsenm.dev/infinitime v0.0.0-20220424014025-bebd1017c532
golang.org/x/text v0.3.7 golang.org/x/text v0.3.7
) )

9
go.sum
View File

@ -447,8 +447,8 @@ github.com/rpcxio/libkv v0.5.1-0.20210420120011-1fceaedca8a5/go.mod h1:zHGgtLr3c
github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U=
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.26.0 h1:ORM4ibhEZeTeQlCojCK2kPz1ogAY4bGs4tD+SaAdGaE= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc=
github.com/rs/zerolog v1.26.0/go.mod h1:yBiM87lvSqX8h0Ww4sdzNSkVYZ8dL2xjZJG1lAuGZEo= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk= github.com/rubyist/circuitbreaker v2.2.1+incompatible h1:KUKd/pV8Geg77+8LNDwdow6rVCAYOp8+kHUyFvL6Mhk=
github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A= github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
@ -560,8 +560,8 @@ github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.4 h1:zNWRjYUW32G9KirMXYHQHVNFkXvMI7LpgNW2AgYAoIs= github.com/yuin/goldmark v1.4.4 h1:zNWRjYUW32G9KirMXYHQHVNFkXvMI7LpgNW2AgYAoIs=
github.com/yuin/goldmark v1.4.4/go.mod h1:rmuwmfZ0+bvzB24eSC//bk1R1Zp3hM0OXYv/G2LIilg= github.com/yuin/goldmark v1.4.4/go.mod h1:rmuwmfZ0+bvzB24eSC//bk1R1Zp3hM0OXYv/G2LIilg=
go.arsenm.dev/infinitime v0.0.0-20220416112421-b7a50271bece h1:ns/GMc4E8ZUZ9TEXhXgU4M+5sRaOLTFFoBWEJ67p3YM= go.arsenm.dev/infinitime v0.0.0-20220424014025-bebd1017c532 h1:HOFaxlEKacGw1nDs23qRv+FTX+PR+v4l1Sujfq5S3pc=
go.arsenm.dev/infinitime v0.0.0-20220416112421-b7a50271bece/go.mod h1:Prvwx7Y2y8HsNRA1tPptduW9jzuw/JffmocvoHcDbYo= go.arsenm.dev/infinitime v0.0.0-20220424014025-bebd1017c532/go.mod h1:1cBQ3fp6QlRbSqu9kEBAHsVThINj31FtqHIYVsQ7wgg=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg= go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel v1.6.3 h1:FLOfo8f9JzFVFVyU+MSRJc2HdEAXQgm7pIv2uFKRSZE= go.opentelemetry.io/otel v1.6.3 h1:FLOfo8f9JzFVFVyU+MSRJc2HdEAXQgm7pIv2uFKRSZE=
@ -585,6 +585,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220408190544-5352b0902921 h1:iU7T1X1J6yxDr0rda54sWGkHgOp5XJrqm79gcNlC2VM= golang.org/x/crypto v0.0.0-20220408190544-5352b0902921 h1:iU7T1X1J6yxDr0rda54sWGkHgOp5XJrqm79gcNlC2VM=
golang.org/x/crypto v0.0.0-20220408190544-5352b0902921/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220408190544-5352b0902921/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

148
socket.go
View File

@ -46,6 +46,7 @@ var (
ErrDFUInvalidFile = errors.New("provided file is invalid for given upgrade type") ErrDFUInvalidFile = errors.New("provided file is invalid for given upgrade type")
ErrDFUNotEnoughFiles = errors.New("not enough files provided for given upgrade type") ErrDFUNotEnoughFiles = errors.New("not enough files provided for given upgrade type")
ErrDFUInvalidUpgType = errors.New("invalid upgrade type") ErrDFUInvalidUpgType = errors.New("invalid upgrade type")
ErrRPCXUsingGateway = errors.New("bidirectional requests are unsupported over gateway")
) )
type DoneMap map[string]chan struct{} type DoneMap map[string]chan struct{}
@ -116,7 +117,7 @@ func startSocket(dev *infinitime.Device) error {
return err return err
} }
go srv.ServeListener("unix", ln) go srv.ServeListener("tcp", ln)
// Log socket start // Log socket start
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket") log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
@ -136,7 +137,12 @@ func (i *ITD) HeartRate(_ context.Context, _ none, out *uint8) error {
} }
func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error { func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // Get client's connection
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// If user is using gateway, the client connection will not be available
if !ok {
return ErrRPCXUsingGateway
}
heartRateCh, cancel, err := i.dev.WatchHeartRate() heartRateCh, cancel, err := i.dev.WatchHeartRate()
if err != nil { if err != nil {
@ -178,7 +184,12 @@ func (i *ITD) BatteryLevel(_ context.Context, _ none, out *uint8) error {
} }
func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error { func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // Get client's connection
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// If user is using gateway, the client connection will not be available
if !ok {
return ErrRPCXUsingGateway
}
battLevelCh, cancel, err := i.dev.WatchBatteryLevel() battLevelCh, cancel, err := i.dev.WatchBatteryLevel()
if err != nil { if err != nil {
@ -220,7 +231,12 @@ func (i *ITD) Motion(_ context.Context, _ none, out *infinitime.MotionValues) er
} }
func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error { func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // Get client's connection
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// If user is using gateway, the client connection will not be available
if !ok {
return ErrRPCXUsingGateway
}
motionValsCh, cancel, err := i.dev.WatchMotion() motionValsCh, cancel, err := i.dev.WatchMotion()
if err != nil { if err != nil {
@ -262,7 +278,12 @@ func (i *ITD) StepCount(_ context.Context, _ none, out *uint32) error {
} }
func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error { func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // Get client's connection
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// If user is using gateway, the client connection will not be available
if !ok {
return ErrRPCXUsingGateway
}
stepCountCh, cancel, err := i.dev.WatchStepCount() stepCountCh, cancel, err := i.dev.WatchStepCount()
if err != nil { if err != nil {
@ -365,23 +386,26 @@ func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, ou
id := uuid.New().String() id := uuid.New().String()
*out = id *out = id
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) // Get client's connection
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// If user is using gateway, the client connection will not be available
if ok {
go func() {
// For every progress event
for event := range i.dev.DFU.Progress() {
data, err := msgpack.Marshal(event)
if err != nil {
log.Error().Err(err).Msg("Error encoding DFU progress event")
continue
}
go func() { i.srv.SendMessage(clientConn, id, "DFUProgress", nil, data)
// For every progress event
for event := range i.dev.DFU.Progress() {
data, err := msgpack.Marshal(event)
if err != nil {
log.Error().Err(err).Msg("Error encoding DFU progress event")
continue
} }
i.srv.SendMessage(clientConn, id, "DFUProgress", nil, data) firmwareUpdating = false
} i.srv.SendMessage(clientConn, id, "Done", nil, nil)
}()
firmwareUpdating = false }
i.srv.SendMessage(clientConn, id, "Done", nil, nil)
}()
// Set firmwareUpdating // Set firmwareUpdating
firmwareUpdating = true firmwareUpdating = true
@ -463,7 +487,6 @@ func (fs *FS) ReadDir(_ context.Context, dir string, out *[]api.FileInfo) error
func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error { func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
fs.updateFS() fs.updateFS()
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
localFile, err := os.Open(paths[1]) localFile, err := os.Open(paths[1])
if err != nil { if err != nil {
@ -483,23 +506,28 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
id := uuid.New().String() id := uuid.New().String()
*out = id *out = id
go func() { // Get client's connection
// For every progress event clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
for sent := range remoteFile.Progress() { // If user is using gateway, the client connection will not be available
data, err := msgpack.Marshal(api.FSTransferProgress{ if ok {
Total: remoteFile.Size(), go func() {
Sent: sent, // For every progress event
}) for sent := range remoteFile.Progress() {
if err != nil { data, err := msgpack.Marshal(api.FSTransferProgress{
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event") Total: remoteFile.Size(),
continue Sent: sent,
})
if err != nil {
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event")
continue
}
fs.srv.SendMessage(clientConn, id, "FSProgress", nil, data)
} }
fs.srv.SendMessage(clientConn, id, "FSProgress", nil, data) fs.srv.SendMessage(clientConn, id, "Done", nil, nil)
} }()
}
fs.srv.SendMessage(clientConn, id, "Done", nil, nil)
}()
go func() { go func() {
io.Copy(remoteFile, localFile) io.Copy(remoteFile, localFile)
@ -512,40 +540,44 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error { func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error {
fs.updateFS() fs.updateFS()
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
localFile, err := os.Create(paths[0]) localFile, err := os.Create(paths[0])
if err != nil { if err != nil {
return err return err
} }
remoteFile, err := fs.fs.Open(paths[1]) remoteFile, err := fs.fs.Open(paths[1])
if err != nil { if err != nil {
return err return err
} }
id := uuid.New().String() id := uuid.New().String()
*out = id *out = id
go func() { // Get client's connection
// For every progress event clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
for rcvd := range remoteFile.Progress() { // If user is using gateway, the client connection will not be available
data, err := msgpack.Marshal(api.FSTransferProgress{ if ok {
Total: remoteFile.Size(), go func() {
Sent: rcvd, // For every progress event
}) for rcvd := range remoteFile.Progress() {
if err != nil { data, err := msgpack.Marshal(api.FSTransferProgress{
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event") Total: remoteFile.Size(),
continue Sent: rcvd,
})
if err != nil {
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event")
continue
}
fs.srv.SendMessage(clientConn, id, "FSProgress", nil, data)
} }
fs.srv.SendMessage(clientConn, id, "FSProgress", nil, data) fs.srv.SendMessage(clientConn, id, "Done", nil, nil)
} localFile.Close()
remoteFile.Close()
fs.srv.SendMessage(clientConn, id, "Done", nil, nil) }()
localFile.Close() }
remoteFile.Close()
}()
go io.Copy(localFile, remoteFile) go io.Copy(localFile, remoteFile)