forked from Elara6331/itd
Switch to lrpc and use context to handle signals
This commit is contained in:
415
socket.go
415
socket.go
@@ -19,26 +19,20 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/smallnest/rpcxlite/server"
|
||||
"github.com/smallnest/rpcxlite/share"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"go.arsenm.dev/infinitime"
|
||||
"go.arsenm.dev/infinitime/blefs"
|
||||
"go.arsenm.dev/itd/api"
|
||||
"go.arsenm.dev/lrpc/codec"
|
||||
"go.arsenm.dev/lrpc/server"
|
||||
)
|
||||
|
||||
// This type signifies an unneeded value.
|
||||
@@ -50,7 +44,6 @@ var (
|
||||
ErrDFUInvalidFile = errors.New("provided file is invalid for given upgrade type")
|
||||
ErrDFUNotEnoughFiles = errors.New("not enough files provided for given upgrade type")
|
||||
ErrDFUInvalidUpgType = errors.New("invalid upgrade type")
|
||||
ErrRPCXNoReturnURL = errors.New("bidirectional requests over gateway require a returnURL field in the metadata")
|
||||
)
|
||||
|
||||
type DoneMap map[string]chan struct{}
|
||||
@@ -100,13 +93,12 @@ func startSocket(dev *infinitime.Device) error {
|
||||
log.Warn().Err(err).Msg("Error getting BLE filesystem")
|
||||
}
|
||||
|
||||
srv := server.NewServer()
|
||||
srv := server.New()
|
||||
|
||||
itdAPI := &ITD{
|
||||
dev: dev,
|
||||
srv: srv,
|
||||
}
|
||||
err = srv.Register(itdAPI, "")
|
||||
err = srv.Register(itdAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -114,14 +106,13 @@ func startSocket(dev *infinitime.Device) error {
|
||||
fsAPI := &FS{
|
||||
dev: dev,
|
||||
fs: fs,
|
||||
srv: srv,
|
||||
}
|
||||
err = srv.Register(fsAPI, "")
|
||||
err = srv.Register(fsAPI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go srv.ServeListener("unix", ln)
|
||||
go srv.Serve(ln, codec.JSON)
|
||||
|
||||
// Log socket start
|
||||
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
|
||||
@@ -131,21 +122,16 @@ func startSocket(dev *infinitime.Device) error {
|
||||
|
||||
type ITD struct {
|
||||
dev *infinitime.Device
|
||||
srv *server.Server
|
||||
}
|
||||
|
||||
func (i *ITD) HeartRate(_ context.Context, _ none, out *uint8) error {
|
||||
heartRate, err := i.dev.HeartRate()
|
||||
*out = heartRate
|
||||
return err
|
||||
func (i *ITD) HeartRate(_ *server.Context) (uint8, error) {
|
||||
return i.dev.HeartRate()
|
||||
}
|
||||
|
||||
func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error {
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, i.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if !ok {
|
||||
return ErrRPCXNoReturnURL
|
||||
func (i *ITD) WatchHeartRate(ctx *server.Context) error {
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
heartRateCh, cancel, err := i.dev.WatchHeartRate()
|
||||
@@ -153,46 +139,32 @@ func (i *ITD) WatchHeartRate(ctx context.Context, _ none, out *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
go func() {
|
||||
done.Create(id)
|
||||
// For every heart rate value
|
||||
for heartRate := range heartRateCh {
|
||||
for {
|
||||
select {
|
||||
case <-done[id]:
|
||||
case <-ctx.Done():
|
||||
fmt.Println("ctx done")
|
||||
// Stop notifications if done signal received
|
||||
cancel()
|
||||
done.Remove(id)
|
||||
return
|
||||
default:
|
||||
data, err := msgpack.Marshal(heartRate)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding heart rate")
|
||||
continue
|
||||
}
|
||||
|
||||
// Send response to connection if no done signal received
|
||||
msgSender.SendMessage(id, "HeartRateSample", nil, data)
|
||||
case heartRate := <-heartRateCh:
|
||||
ch <- heartRate
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
*out = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) BatteryLevel(_ context.Context, _ none, out *uint8) error {
|
||||
battLevel, err := i.dev.BatteryLevel()
|
||||
*out = battLevel
|
||||
return err
|
||||
func (i *ITD) BatteryLevel(_ *server.Context) (uint8, error) {
|
||||
return i.dev.BatteryLevel()
|
||||
}
|
||||
|
||||
func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error {
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, i.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if !ok {
|
||||
return ErrRPCXNoReturnURL
|
||||
func (i *ITD) WatchBatteryLevel(ctx *server.Context) error {
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
battLevelCh, cancel, err := i.dev.WatchBatteryLevel()
|
||||
@@ -200,46 +172,31 @@ func (i *ITD) WatchBatteryLevel(ctx context.Context, _ none, out *string) error
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
go func() {
|
||||
done.Create(id)
|
||||
// For every heart rate value
|
||||
for battLevel := range battLevelCh {
|
||||
for {
|
||||
select {
|
||||
case <-done[id]:
|
||||
case <-ctx.Done():
|
||||
// Stop notifications if done signal received
|
||||
cancel()
|
||||
done.Remove(id)
|
||||
return
|
||||
default:
|
||||
data, err := msgpack.Marshal(battLevel)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding battery level")
|
||||
continue
|
||||
}
|
||||
|
||||
// Send response to connection if no done signal received
|
||||
msgSender.SendMessage(id, "BatteryLevelSample", nil, data)
|
||||
case battLevel := <-battLevelCh:
|
||||
ch <- battLevel
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
*out = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) Motion(_ context.Context, _ none, out *infinitime.MotionValues) error {
|
||||
motionVals, err := i.dev.Motion()
|
||||
*out = motionVals
|
||||
return err
|
||||
func (i *ITD) Motion(_ *server.Context) (infinitime.MotionValues, error) {
|
||||
return i.dev.Motion()
|
||||
}
|
||||
|
||||
func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error {
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, i.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if !ok {
|
||||
return ErrRPCXNoReturnURL
|
||||
func (i *ITD) WatchMotion(ctx *server.Context) error {
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
motionValsCh, cancel, err := i.dev.WatchMotion()
|
||||
@@ -247,46 +204,31 @@ func (i *ITD) WatchMotion(ctx context.Context, _ none, out *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
go func() {
|
||||
done.Create(id)
|
||||
// For every heart rate value
|
||||
for motionVals := range motionValsCh {
|
||||
for {
|
||||
select {
|
||||
case <-done[id]:
|
||||
case <-ctx.Done():
|
||||
// Stop notifications if done signal received
|
||||
cancel()
|
||||
done.Remove(id)
|
||||
return
|
||||
default:
|
||||
data, err := msgpack.Marshal(motionVals)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding motion values")
|
||||
continue
|
||||
}
|
||||
|
||||
// Send response to connection if no done signal received
|
||||
msgSender.SendMessage(id, "MotionSample", nil, data)
|
||||
case motionVals := <-motionValsCh:
|
||||
ch <- motionVals
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
*out = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) StepCount(_ context.Context, _ none, out *uint32) error {
|
||||
stepCount, err := i.dev.StepCount()
|
||||
*out = stepCount
|
||||
return err
|
||||
func (i *ITD) StepCount(_ *server.Context) (uint32, error) {
|
||||
return i.dev.StepCount()
|
||||
}
|
||||
|
||||
func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error {
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, i.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if !ok {
|
||||
return ErrRPCXNoReturnURL
|
||||
func (i *ITD) WatchStepCount(ctx *server.Context) error {
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stepCountCh, cancel, err := i.dev.WatchStepCount()
|
||||
@@ -294,60 +236,44 @@ func (i *ITD) WatchStepCount(ctx context.Context, _ none, out *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
go func() {
|
||||
done.Create(id)
|
||||
// For every heart rate value
|
||||
for stepCount := range stepCountCh {
|
||||
for {
|
||||
select {
|
||||
case <-done[id]:
|
||||
case <-ctx.Done():
|
||||
// Stop notifications if done signal received
|
||||
cancel()
|
||||
done.Remove(id)
|
||||
return
|
||||
default:
|
||||
data, err := msgpack.Marshal(stepCount)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding step count")
|
||||
continue
|
||||
}
|
||||
|
||||
// Send response to connection if no done signal received
|
||||
msgSender.SendMessage(id, "StepCountSample", nil, data)
|
||||
case stepCount := <-stepCountCh:
|
||||
ch <- stepCount
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
*out = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) Version(_ context.Context, _ none, out *string) error {
|
||||
version, err := i.dev.Version()
|
||||
*out = version
|
||||
return err
|
||||
func (i *ITD) Version(_ *server.Context) (string, error) {
|
||||
return i.dev.Version()
|
||||
}
|
||||
|
||||
func (i *ITD) Address(_ context.Context, _ none, out *string) error {
|
||||
addr := i.dev.Address()
|
||||
*out = addr
|
||||
return nil
|
||||
func (i *ITD) Address(_ *server.Context) string {
|
||||
return i.dev.Address()
|
||||
}
|
||||
|
||||
func (i *ITD) Notify(_ context.Context, data api.NotifyData, _ *none) error {
|
||||
func (i *ITD) Notify(_ *server.Context, data api.NotifyData) error {
|
||||
return i.dev.Notify(data.Title, data.Body)
|
||||
}
|
||||
|
||||
func (i *ITD) SetTime(_ context.Context, t time.Time, _ *none) error {
|
||||
return i.dev.SetTime(t)
|
||||
func (i *ITD) SetTime(_ *server.Context, t *time.Time) error {
|
||||
return i.dev.SetTime(*t)
|
||||
}
|
||||
|
||||
func (i *ITD) WeatherUpdate(_ context.Context, _ none, _ *none) error {
|
||||
func (i *ITD) WeatherUpdate(_ *server.Context) {
|
||||
sendWeatherCh <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, out *string) error {
|
||||
func (i *ITD) FirmwareUpgrade(ctx *server.Context, reqData api.FwUpgradeData) error {
|
||||
i.dev.DFU.Reset()
|
||||
|
||||
switch reqData.Type {
|
||||
@@ -387,30 +313,22 @@ func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, ou
|
||||
return ErrDFUInvalidUpgType
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
*out = id
|
||||
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, i.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if ok {
|
||||
go func() {
|
||||
// For every progress event
|
||||
for event := range i.dev.DFU.Progress() {
|
||||
data, err := msgpack.Marshal(event)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding DFU progress event")
|
||||
continue
|
||||
}
|
||||
|
||||
msgSender.SendMessage(id, "DFUProgress", nil, data)
|
||||
}
|
||||
|
||||
firmwareUpdating = false
|
||||
msgSender.SendMessage(id, "Done", nil, nil)
|
||||
}()
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
// For every progress event
|
||||
for event := range i.dev.DFU.Progress() {
|
||||
ch <- event
|
||||
}
|
||||
|
||||
firmwareUpdating = false
|
||||
// Send zero object to signal completion
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
// Set firmwareUpdating
|
||||
firmwareUpdating = true
|
||||
|
||||
@@ -427,18 +345,12 @@ func (i *ITD) FirmwareUpgrade(ctx context.Context, reqData api.FwUpgradeData, ou
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ITD) Done(_ context.Context, id string, _ *none) error {
|
||||
done.Done(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
type FS struct {
|
||||
dev *infinitime.Device
|
||||
fs *blefs.FS
|
||||
srv *server.Server
|
||||
}
|
||||
|
||||
func (fs *FS) Remove(_ context.Context, paths []string, _ *none) error {
|
||||
func (fs *FS) Remove(_ *server.Context, paths []string) error {
|
||||
fs.updateFS()
|
||||
for _, path := range paths {
|
||||
err := fs.fs.Remove(path)
|
||||
@@ -449,12 +361,12 @@ func (fs *FS) Remove(_ context.Context, paths []string, _ *none) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *FS) Rename(_ context.Context, paths [2]string, _ *none) error {
|
||||
func (fs *FS) Rename(_ *server.Context, paths [2]string) error {
|
||||
fs.updateFS()
|
||||
return fs.fs.Rename(paths[0], paths[1])
|
||||
}
|
||||
|
||||
func (fs *FS) Mkdir(_ context.Context, paths []string, _ *none) error {
|
||||
func (fs *FS) Mkdir(_ *server.Context, paths []string) error {
|
||||
fs.updateFS()
|
||||
for _, path := range paths {
|
||||
err := fs.fs.Mkdir(path)
|
||||
@@ -465,18 +377,18 @@ func (fs *FS) Mkdir(_ context.Context, paths []string, _ *none) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *FS) ReadDir(_ context.Context, dir string, out *[]api.FileInfo) error {
|
||||
func (fs *FS) ReadDir(_ *server.Context, dir string) ([]api.FileInfo, error) {
|
||||
fs.updateFS()
|
||||
|
||||
entries, err := fs.fs.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
var fileInfo []api.FileInfo
|
||||
for _, entry := range entries {
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
fileInfo = append(fileInfo, api.FileInfo{
|
||||
Name: info.Name(),
|
||||
@@ -485,11 +397,10 @@ func (fs *FS) ReadDir(_ context.Context, dir string, out *[]api.FileInfo) error
|
||||
})
|
||||
}
|
||||
|
||||
*out = fileInfo
|
||||
return nil
|
||||
return fileInfo, nil
|
||||
}
|
||||
|
||||
func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
|
||||
func (fs *FS) Upload(ctx *server.Context, paths [2]string) error {
|
||||
fs.updateFS()
|
||||
|
||||
localFile, err := os.Open(paths[1])
|
||||
@@ -507,31 +418,22 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
*out = id
|
||||
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, fs.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if ok {
|
||||
go func() {
|
||||
// For every progress event
|
||||
for sent := range remoteFile.Progress() {
|
||||
data, err := msgpack.Marshal(api.FSTransferProgress{
|
||||
Total: remoteFile.Size(),
|
||||
Sent: sent,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event")
|
||||
continue
|
||||
}
|
||||
|
||||
msgSender.SendMessage(id, "FSProgress", nil, data)
|
||||
}
|
||||
|
||||
msgSender.SendMessage(id, "Done", nil, nil)
|
||||
}()
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
// For every progress event
|
||||
for sent := range remoteFile.Progress() {
|
||||
ch <- api.FSTransferProgress{
|
||||
Total: remoteFile.Size(),
|
||||
Sent: sent,
|
||||
}
|
||||
}
|
||||
|
||||
// Send zero object to signal completion
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
io.Copy(remoteFile, localFile)
|
||||
@@ -542,7 +444,7 @@ func (fs *FS) Upload(ctx context.Context, paths [2]string, out *string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error {
|
||||
func (fs *FS) Download(ctx *server.Context, paths [2]string) error {
|
||||
fs.updateFS()
|
||||
|
||||
localFile, err := os.Create(paths[0])
|
||||
@@ -555,33 +457,24 @@ func (fs *FS) Download(ctx context.Context, paths [2]string, out *string) error
|
||||
return err
|
||||
}
|
||||
|
||||
id := uuid.New().String()
|
||||
*out = id
|
||||
|
||||
// Get client message sender
|
||||
msgSender, ok := getMsgSender(ctx, fs.srv)
|
||||
// If user is using gateway, the client connection will not be available
|
||||
if ok {
|
||||
go func() {
|
||||
// For every progress event
|
||||
for rcvd := range remoteFile.Progress() {
|
||||
data, err := msgpack.Marshal(api.FSTransferProgress{
|
||||
Total: remoteFile.Size(),
|
||||
Sent: rcvd,
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error encoding filesystem transfer progress event")
|
||||
continue
|
||||
}
|
||||
|
||||
msgSender.SendMessage(id, "FSProgress", nil, data)
|
||||
}
|
||||
|
||||
msgSender.SendMessage(id, "Done", nil, nil)
|
||||
localFile.Close()
|
||||
remoteFile.Close()
|
||||
}()
|
||||
ch, err := ctx.MakeChannel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
// For every progress event
|
||||
for sent := range remoteFile.Progress() {
|
||||
ch <- api.FSTransferProgress{
|
||||
Total: remoteFile.Size(),
|
||||
Sent: sent,
|
||||
}
|
||||
}
|
||||
|
||||
// Send zero object to signal completion
|
||||
close(ch)
|
||||
localFile.Close()
|
||||
remoteFile.Close()
|
||||
}()
|
||||
|
||||
go io.Copy(localFile, remoteFile)
|
||||
|
||||
@@ -602,87 +495,3 @@ func (fs *FS) updateFS() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanPaths runs strings.TrimSpace and filepath.Clean
|
||||
// on all inputs, and returns the updated slice
|
||||
func cleanPaths(paths []string) []string {
|
||||
for index, path := range paths {
|
||||
newPath := strings.TrimSpace(path)
|
||||
paths[index] = filepath.Clean(newPath)
|
||||
}
|
||||
return paths
|
||||
}
|
||||
|
||||
func getMsgSender(ctx context.Context, srv *server.Server) (MessageSender, bool) {
|
||||
// Get client message sender
|
||||
clientConn, ok := ctx.Value(server.RemoteConnContextKey).(net.Conn)
|
||||
// If the connection exists, use rpcMsgSender
|
||||
if ok {
|
||||
return &rpcMsgSender{srv, clientConn}, true
|
||||
} else {
|
||||
// Get metadata if it exists
|
||||
metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
// Get returnURL field from metadata if it exists
|
||||
returnURL, ok := metadata["returnURL"]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
// Use httpMsgSender
|
||||
return &httpMsgSender{returnURL}, true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// The MessageSender interface sends messages to the client
|
||||
type MessageSender interface {
|
||||
SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error
|
||||
}
|
||||
|
||||
// rpcMsgSender sends messages using RPCX, for clients that support it
|
||||
type rpcMsgSender struct {
|
||||
srv *server.Server
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
// SendMessage uses the server to send an RPCX message back to the client
|
||||
func (r *rpcMsgSender) SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error {
|
||||
return r.srv.SendMessage(r.conn, servicePath, serviceMethod, metadata, data)
|
||||
}
|
||||
|
||||
// httpMsgSender sends messages to the given return URL, for clients that provide it
|
||||
type httpMsgSender struct {
|
||||
url string
|
||||
}
|
||||
|
||||
// SendMessage uses HTTP to send a message back to the client
|
||||
func (h *httpMsgSender) SendMessage(servicePath, serviceMethod string, metadata map[string]string, data []byte) error {
|
||||
// Create new POST request with provided URL
|
||||
req, err := http.NewRequest(http.MethodPost, h.url, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set service path and method headers
|
||||
req.Header.Set("X-RPCX-ServicePath", servicePath)
|
||||
req.Header.Set("X-RPCX-ServiceMethod", serviceMethod)
|
||||
|
||||
// Create new URL query values
|
||||
query := url.Values{}
|
||||
// Transfer values from metadata to query
|
||||
for k, v := range metadata {
|
||||
query.Set(k, v)
|
||||
}
|
||||
// Set metadata header by encoding query values
|
||||
req.Header.Set("X-RPCX-Meta", query.Encode())
|
||||
|
||||
// Perform request
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Close body
|
||||
return res.Body.Close()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user