Add connection multiplexing, fixing itgui
This commit is contained in:
parent
01919e67a3
commit
b656c69350
13
api/api.go
13
api/api.go
@ -5,13 +5,14 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"go.arsenm.dev/itd/internal/rpc"
|
"go.arsenm.dev/itd/internal/rpc"
|
||||||
|
"storj.io/drpc"
|
||||||
"storj.io/drpc/drpcconn"
|
"storj.io/drpc/drpcconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
const DefaultAddr = "/tmp/itd/socket"
|
const DefaultAddr = "/tmp/itd/socket"
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
conn *drpcconn.Conn
|
conn drpc.Conn
|
||||||
client rpc.DRPCITDClient
|
client rpc.DRPCITDClient
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -20,11 +21,15 @@ func New(sockPath string) (*Client, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
dconn := drpcconn.New(conn)
|
|
||||||
|
mconn, err := newMuxConn(conn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
conn: dconn,
|
conn: mconn,
|
||||||
client: rpc.NewDRPCITDClient(dconn),
|
client: rpc.NewDRPCITDClient(mconn),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
65
api/drpc.go
Normal file
65
api/drpc.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
"storj.io/drpc"
|
||||||
|
"storj.io/drpc/drpcconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ drpc.Conn = &muxConn{}
|
||||||
|
|
||||||
|
type muxConn struct {
|
||||||
|
conn io.ReadWriteCloser
|
||||||
|
sess *yamux.Session
|
||||||
|
closed chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMuxConn(conn io.ReadWriteCloser) (*muxConn, error) {
|
||||||
|
sess, err := yamux.Client(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &muxConn{
|
||||||
|
conn: conn,
|
||||||
|
sess: sess,
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxConn) Close() error {
|
||||||
|
defer close(m.closed)
|
||||||
|
|
||||||
|
err := m.sess.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return m.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxConn) Closed() <-chan struct{} {
|
||||||
|
return m.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
||||||
|
conn, err := m.sess.Open()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
dconn := drpcconn.New(conn)
|
||||||
|
return dconn.Invoke(ctx, rpc, enc, in, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *muxConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
||||||
|
conn, err := m.sess.Open()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dconn := drpcconn.New(conn)
|
||||||
|
return dconn.NewStream(ctx, rpc, enc)
|
||||||
|
}
|
1
go.mod
1
go.mod
@ -10,6 +10,7 @@ require (
|
|||||||
github.com/cheggaaa/pb/v3 v3.1.0
|
github.com/cheggaaa/pb/v3 v3.1.0
|
||||||
github.com/gen2brain/dlgs v0.0.0-20220603100644-40c77870fa8d
|
github.com/gen2brain/dlgs v0.0.0-20220603100644-40c77870fa8d
|
||||||
github.com/godbus/dbus/v5 v5.1.0
|
github.com/godbus/dbus/v5 v5.1.0
|
||||||
|
github.com/hashicorp/yamux v0.1.1
|
||||||
github.com/knadh/koanf v1.4.4
|
github.com/knadh/koanf v1.4.4
|
||||||
github.com/mattn/go-isatty v0.0.17
|
github.com/mattn/go-isatty v0.0.17
|
||||||
github.com/mozillazg/go-pinyin v0.19.0
|
github.com/mozillazg/go-pinyin v0.19.0
|
||||||
|
2
go.sum
2
go.sum
@ -297,6 +297,8 @@ github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoI
|
|||||||
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
|
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
|
||||||
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
||||||
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
|
||||||
|
github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
|
||||||
|
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
|
||||||
github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs=
|
github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs=
|
||||||
github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E=
|
github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E=
|
||||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||||
|
26
socket.go
26
socket.go
@ -27,6 +27,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"go.arsenm.dev/infinitime"
|
"go.arsenm.dev/infinitime"
|
||||||
"go.arsenm.dev/infinitime/blefs"
|
"go.arsenm.dev/infinitime/blefs"
|
||||||
@ -79,7 +80,30 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
go drpcserver.New(mux).Serve(ctx, ln)
|
srv := drpcserver.New(mux)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
conn, err := ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Err(err).Msg("Error accepting connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
sess, err := yamux.Server(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Err(err).Msg("Error creating multiplexed session")
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
conn, err := sess.Accept()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Err(err).Msg("Error accepting stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
go srv.ServeOne(ctx, conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Log socket start
|
// Log socket start
|
||||||
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
|
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
|
||||||
|
Loading…
Reference in New Issue
Block a user