Move multiplexing code into separate module
This commit is contained in:
parent
053f8f50d7
commit
19a9f64525
@ -4,6 +4,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
"go.arsenm.dev/drpc/muxconn"
|
||||||
"go.arsenm.dev/itd/internal/rpc"
|
"go.arsenm.dev/itd/internal/rpc"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
)
|
)
|
||||||
@ -25,7 +26,7 @@ func New(sockPath string) (*Client, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mconn, err := newMuxConn(conn)
|
mconn, err := muxconn.New(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -39,7 +40,7 @@ func New(sockPath string) (*Client, error) {
|
|||||||
// NewFromConn returns a client that communicates
|
// NewFromConn returns a client that communicates
|
||||||
// over the given connection.
|
// over the given connection.
|
||||||
func NewFromConn(conn io.ReadWriteCloser) (*Client, error) {
|
func NewFromConn(conn io.ReadWriteCloser) (*Client, error) {
|
||||||
mconn, err := newMuxConn(conn)
|
mconn, err := muxconn.New(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
83
api/drpc.go
83
api/drpc.go
@ -1,83 +0,0 @@
|
|||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/hashicorp/yamux"
|
|
||||||
"storj.io/drpc"
|
|
||||||
"storj.io/drpc/drpcconn"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ drpc.Conn = &muxConn{}
|
|
||||||
|
|
||||||
// muxConn implements drpc.Conn using the yamux
|
|
||||||
// multiplexer to allow concurrent RPCs
|
|
||||||
type muxConn struct {
|
|
||||||
conn io.ReadWriteCloser
|
|
||||||
sess *yamux.Session
|
|
||||||
closed chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMuxConn(conn io.ReadWriteCloser) (*muxConn, error) {
|
|
||||||
sess, err := yamux.Client(conn, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &muxConn{
|
|
||||||
conn: conn,
|
|
||||||
sess: sess,
|
|
||||||
closed: make(chan struct{}),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *muxConn) Close() error {
|
|
||||||
defer close(m.closed)
|
|
||||||
|
|
||||||
err := m.sess.Close()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return m.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *muxConn) Closed() <-chan struct{} {
|
|
||||||
return m.closed
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *muxConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
|
||||||
conn, err := m.sess.Open()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer conn.Close()
|
|
||||||
dconn := drpcconn.New(conn)
|
|
||||||
defer dconn.Close()
|
|
||||||
return dconn.Invoke(ctx, rpc, enc, in, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *muxConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
|
||||||
conn, err := m.sess.Open()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dconn := drpcconn.New(conn)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-dconn.Closed()
|
|
||||||
conn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
s, err := dconn.NewStream(ctx, rpc, enc)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-s.Context().Done()
|
|
||||||
dconn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
return s, nil
|
|
||||||
}
|
|
1
go.mod
1
go.mod
@ -16,6 +16,7 @@ require (
|
|||||||
github.com/mozillazg/go-pinyin v0.19.0
|
github.com/mozillazg/go-pinyin v0.19.0
|
||||||
github.com/rs/zerolog v1.28.0
|
github.com/rs/zerolog v1.28.0
|
||||||
github.com/urfave/cli/v2 v2.23.7
|
github.com/urfave/cli/v2 v2.23.7
|
||||||
|
go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d
|
||||||
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a
|
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a
|
||||||
golang.org/x/text v0.5.0
|
golang.org/x/text v0.5.0
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.28.1
|
||||||
|
4
go.sum
4
go.sum
@ -527,6 +527,10 @@ github.com/yuin/goldmark v1.5.3/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5ta
|
|||||||
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||||
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
|
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
|
||||||
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||||
|
go.arsenm.dev/drpc v0.0.0-20230103205749-5dc01e6d22c7 h1:YJAOATYk33JWJyeZ8C/wu5WzxPCRMqIDLiR1OMn3hyg=
|
||||||
|
go.arsenm.dev/drpc v0.0.0-20230103205749-5dc01e6d22c7/go.mod h1:K5cFls42m5q1RIphTVojRdXLaoCknq/kBqQt8Ow3XuA=
|
||||||
|
go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d h1:LKeP4O5WwJus3YvfVILSok/tqX5CM9Qm2I7BvU9DYrE=
|
||||||
|
go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d/go.mod h1:K5cFls42m5q1RIphTVojRdXLaoCknq/kBqQt8Ow3XuA=
|
||||||
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a h1:4XmKIO0udIwVS9wqoi4hq76y3+X0MeV/GRMo2/01xSU=
|
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a h1:4XmKIO0udIwVS9wqoi4hq76y3+X0MeV/GRMo2/01xSU=
|
||||||
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a/go.mod h1:K3NJ6fyPv5qqHUedB3MccKOE0whJMJZ80l/yTzzTrgc=
|
go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a/go.mod h1:K3NJ6fyPv5qqHUedB3MccKOE0whJMJZ80l/yTzzTrgc=
|
||||||
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
|
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
|
||||||
|
32
socket.go
32
socket.go
@ -27,13 +27,12 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/yamux"
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
"go.arsenm.dev/drpc/muxserver"
|
||||||
"go.arsenm.dev/infinitime"
|
"go.arsenm.dev/infinitime"
|
||||||
"go.arsenm.dev/infinitime/blefs"
|
"go.arsenm.dev/infinitime/blefs"
|
||||||
"go.arsenm.dev/itd/internal/rpc"
|
"go.arsenm.dev/itd/internal/rpc"
|
||||||
"storj.io/drpc/drpcmux"
|
"storj.io/drpc/drpcmux"
|
||||||
"storj.io/drpc/drpcserver"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -80,34 +79,7 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := drpcserver.New(mux)
|
go muxserver.New(mux).Serve(ctx, ln)
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
conn, err := ln.Accept()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("Error accepting connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
sess, err := yamux.Server(conn, nil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("Error creating multiplexed session")
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
conn, err := sess.Accept()
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
log.Fatal().Err(err).Msg("Error accepting stream")
|
|
||||||
}
|
|
||||||
|
|
||||||
go srv.ServeOne(ctx, conn)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Log socket start
|
// Log socket start
|
||||||
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
|
log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")
|
||||||
|
Loading…
Reference in New Issue
Block a user