Switch to lrpc and use context to handle signals

This commit is contained in:
2022-05-01 11:36:28 -07:00
parent c929635029
commit a1ee021675
14 changed files with 265 additions and 702 deletions

View File

@@ -1,117 +1,30 @@
package api
import (
"context"
"net"
"github.com/smallnest/rpcxlite/client"
"github.com/smallnest/rpcxlite/protocol"
"github.com/vmihailenco/msgpack/v5"
"go.arsenm.dev/infinitime"
"go.arsenm.dev/lrpc/client"
"go.arsenm.dev/lrpc/codec"
)
const DefaultAddr = "/tmp/itd/socket"
type Client struct {
itdClient client.XClient
itdCh chan *protocol.Message
fsClient client.XClient
fsCh chan *protocol.Message
srvVals map[string]chan interface{}
client *client.Client
}
func New(sockPath string) (*Client, error) {
d, err := client.NewPeer2PeerDiscovery("unix@"+sockPath, "")
conn, err := net.Dial("unix", sockPath)
if err != nil {
return nil, err
}
out := &Client{}
out.itdCh = make(chan *protocol.Message, 5)
out.itdClient = client.NewBidirectionalXClient(
"ITD",
client.Failtry,
client.RandomSelect,
d,
client.DefaultOption,
out.itdCh,
)
out.fsCh = make(chan *protocol.Message, 5)
out.fsClient = client.NewBidirectionalXClient(
"FS",
client.Failtry,
client.RandomSelect,
d,
client.DefaultOption,
out.fsCh,
)
out.srvVals = map[string]chan interface{}{}
go out.handleMessages(out.itdCh)
go out.handleMessages(out.fsCh)
out := &Client{
client: client.New(conn, codec.JSON),
}
return out, nil
}
func (c *Client) handleMessages(msgCh chan *protocol.Message) {
for msg := range msgCh {
_, ok := c.srvVals[msg.ServicePath]
if !ok {
c.srvVals[msg.ServicePath] = make(chan interface{}, 5)
}
//fmt.Printf("%+v\n", msg)
ch := c.srvVals[msg.ServicePath]
switch msg.ServiceMethod {
case "FSProgress":
var progress FSTransferProgress
msgpack.Unmarshal(msg.Payload, &progress)
ch <- progress
case "DFUProgress":
var progress infinitime.DFUProgress
msgpack.Unmarshal(msg.Payload, &progress)
ch <- progress
case "MotionSample":
var motionVals infinitime.MotionValues
msgpack.Unmarshal(msg.Payload, &motionVals)
ch <- motionVals
case "Done":
close(c.srvVals[msg.ServicePath])
delete(c.srvVals, msg.ServicePath)
default:
var value interface{}
msgpack.Unmarshal(msg.Payload, &value)
ch <- value
}
}
}
func (c *Client) done(id string) error {
return c.itdClient.Call(
context.Background(),
"Done",
id,
nil,
)
}
func (c *Client) Close() error {
err := c.itdClient.Close()
if err != nil {
return err
}
err = c.fsClient.Close()
if err != nil {
return err
}
close(c.itdCh)
close(c.fsCh)
return nil
return c.client.Close()
}

View File

@@ -1,40 +1,23 @@
package api
import (
"context"
"time"
"go.arsenm.dev/infinitime"
)
func (c *Client) FirmwareUpgrade(upgType UpgradeType, files ...string) (chan infinitime.DFUProgress, error) {
var id string
err := c.itdClient.Call(
context.Background(),
progressCh := make(chan infinitime.DFUProgress, 5)
err := c.client.Call(
"ITD",
"FirmwareUpgrade",
FwUpgradeData{
Type: upgType,
Files: files,
},
&id,
&progressCh,
)
if err != nil {
return nil, err
}
progressCh := make(chan infinitime.DFUProgress, 5)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
progressCh <- val.(infinitime.DFUProgress)
}
close(progressCh)
}()
return progressCh, nil
}

View File

@@ -1,13 +1,8 @@
package api
import (
"context"
"time"
)
func (c *Client) Remove(paths ...string) error {
return c.fsClient.Call(
context.Background(),
return c.client.Call(
"FS",
"Remove",
paths,
nil,
@@ -15,17 +10,17 @@ func (c *Client) Remove(paths ...string) error {
}
func (c *Client) Rename(old, new string) error {
return c.fsClient.Call(
context.Background(),
"Remove",
return c.client.Call(
"FS",
"Rename",
[2]string{old, new},
nil,
)
}
func (c *Client) Mkdir(paths ...string) error {
return c.fsClient.Call(
context.Background(),
return c.client.Call(
"FS",
"Mkdir",
paths,
nil,
@@ -33,8 +28,8 @@ func (c *Client) Mkdir(paths ...string) error {
}
func (c *Client) ReadDir(dir string) (out []FileInfo, err error) {
err = c.fsClient.Call(
context.Background(),
err = c.client.Call(
"FS",
"ReadDir",
dir,
&out,
@@ -43,59 +38,31 @@ func (c *Client) ReadDir(dir string) (out []FileInfo, err error) {
}
func (c *Client) Upload(dst, src string) (chan FSTransferProgress, error) {
var id string
err := c.fsClient.Call(
context.Background(),
progressCh := make(chan FSTransferProgress, 5)
err := c.client.Call(
"FS",
"Upload",
[2]string{dst, src},
&id,
progressCh,
)
if err != nil {
return nil, err
}
progressCh := make(chan FSTransferProgress, 5)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
progressCh <- val.(FSTransferProgress)
}
close(progressCh)
}()
return progressCh, nil
}
func (c *Client) Download(dst, src string) (chan FSTransferProgress, error) {
var id string
err := c.fsClient.Call(
context.Background(),
progressCh := make(chan FSTransferProgress, 5)
err := c.client.Call(
"FS",
"Download",
[2]string{dst, src},
&id,
progressCh,
)
if err != nil {
return nil, err
}
progressCh := make(chan FSTransferProgress, 5)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
progressCh <- val.(FSTransferProgress)
}
close(progressCh)
}()
return progressCh, nil
}

View File

@@ -1,14 +1,12 @@
package api
import (
"context"
"go.arsenm.dev/infinitime"
)
func (c *Client) HeartRate() (out uint8, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"HeartRate",
nil,
&out,
@@ -17,8 +15,8 @@ func (c *Client) HeartRate() (out uint8, err error) {
}
func (c *Client) BatteryLevel() (out uint8, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"BatteryLevel",
nil,
&out,
@@ -27,8 +25,8 @@ func (c *Client) BatteryLevel() (out uint8, err error) {
}
func (c *Client) Motion() (out infinitime.MotionValues, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"Motion",
nil,
&out,
@@ -37,8 +35,8 @@ func (c *Client) Motion() (out infinitime.MotionValues, err error) {
}
func (c *Client) StepCount() (out uint32, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"StepCount",
nil,
&out,
@@ -47,8 +45,8 @@ func (c *Client) StepCount() (out uint32, err error) {
}
func (c *Client) Version() (out string, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"Version",
nil,
&out,
@@ -57,8 +55,8 @@ func (c *Client) Version() (out string, err error) {
}
func (c *Client) Address() (out string, err error) {
err = c.itdClient.Call(
context.Background(),
err = c.client.Call(
"ITD",
"Address",
nil,
&out,

View File

@@ -1,17 +1,13 @@
package api
import (
"context"
)
func (c *Client) Notify(title, body string) error {
return c.itdClient.Call(
context.Background(),
return c.client.Call(
"ITD",
"Notify",
NotifyData{
Title: title,
Body: body,
Body: body,
},
nil,
)
}
}

View File

@@ -1,13 +1,12 @@
package api
import (
"context"
"time"
)
func (c *Client) SetTime(t time.Time) error {
return c.itdClient.Call(
context.Background(),
return c.client.Call(
"ITD",
"SetTime",
t,
nil,

View File

@@ -1,12 +1,10 @@
package api
import "context"
func (c *Client) WeatherUpdate() error {
return c.itdClient.Call(
context.Background(),
return c.client.Call(
"ITD",
"WeatherUpdate",
nil,
nil,
)
}
}

View File

@@ -1,143 +1,80 @@
package api
import (
"context"
"time"
"go.arsenm.dev/infinitime"
)
func (c *Client) WatchHeartRate() (<-chan uint8, func(), error) {
var id string
err := c.itdClient.Call(
context.Background(),
outCh := make(chan uint8, 2)
err := c.client.Call(
"ITD",
"WatchHeartRate",
nil,
&id,
outCh,
)
if err != nil {
return nil, nil, err
}
outCh := make(chan uint8, 2)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
outCh <- val.(uint8)
}
}()
doneFn := func() {
c.done(id)
close(c.srvVals[id])
delete(c.srvVals, id)
close(outCh)
}
return outCh, doneFn, nil
}
func (c *Client) WatchBatteryLevel() (<-chan uint8, func(), error) {
var id string
err := c.itdClient.Call(
context.Background(),
outCh := make(chan uint8, 2)
err := c.client.Call(
"ITD",
"WatchBatteryLevel",
nil,
&id,
outCh,
)
if err != nil {
return nil, nil, err
}
outCh := make(chan uint8, 2)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
outCh <- val.(uint8)
}
}()
doneFn := func() {
c.done(id)
close(c.srvVals[id])
delete(c.srvVals, id)
close(outCh)
}
return outCh, doneFn, nil
}
func (c *Client) WatchStepCount() (<-chan uint32, func(), error) {
var id string
err := c.itdClient.Call(
context.Background(),
outCh := make(chan uint32, 2)
err := c.client.Call(
"ITD",
"WatchStepCount",
nil,
&id,
outCh,
)
if err != nil {
return nil, nil, err
}
outCh := make(chan uint32, 2)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
outCh <- val.(uint32)
}
}()
doneFn := func() {
c.done(id)
close(c.srvVals[id])
delete(c.srvVals, id)
close(outCh)
}
return outCh, doneFn, nil
}
func (c *Client) WatchMotion() (<-chan infinitime.MotionValues, func(), error) {
var id string
err := c.itdClient.Call(
context.Background(),
outCh := make(chan infinitime.MotionValues, 2)
err := c.client.Call(
"ITD",
"WatchMotion",
nil,
&id,
outCh,
)
if err != nil {
return nil, nil, err
}
outCh := make(chan infinitime.MotionValues, 2)
go func() {
srvValCh, ok := c.srvVals[id]
for !ok {
time.Sleep(100 * time.Millisecond)
srvValCh, ok = c.srvVals[id]
}
for val := range srvValCh {
outCh <- val.(infinitime.MotionValues)
}
}()
doneFn := func() {
c.done(id)
close(c.srvVals[id])
delete(c.srvVals, id)
close(outCh)
}
return outCh, doneFn, nil