From c3a61b58934a9e2a2b57a057b3c418a429b75a4e Mon Sep 17 00:00:00 2001 From: Arsen Musayelyan Date: Tue, 3 Jan 2023 13:06:38 -0800 Subject: [PATCH] Move multiplexing code into separate module --- api/api.go | 5 ++-- api/drpc.go | 83 ----------------------------------------------------- go.mod | 1 + go.sum | 4 +++ socket.go | 32 ++------------------- 5 files changed, 10 insertions(+), 115 deletions(-) delete mode 100644 api/drpc.go diff --git a/api/api.go b/api/api.go index aeb9656..0f59e48 100644 --- a/api/api.go +++ b/api/api.go @@ -4,6 +4,7 @@ import ( "io" "net" + "go.arsenm.dev/drpc/muxconn" "go.arsenm.dev/itd/internal/rpc" "storj.io/drpc" ) @@ -25,7 +26,7 @@ func New(sockPath string) (*Client, error) { return nil, err } - mconn, err := newMuxConn(conn) + mconn, err := muxconn.New(conn) if err != nil { return nil, err } @@ -39,7 +40,7 @@ func New(sockPath string) (*Client, error) { // NewFromConn returns a client that communicates // over the given connection. func NewFromConn(conn io.ReadWriteCloser) (*Client, error) { - mconn, err := newMuxConn(conn) + mconn, err := muxconn.New(conn) if err != nil { return nil, err } diff --git a/api/drpc.go b/api/drpc.go deleted file mode 100644 index 9f0988b..0000000 --- a/api/drpc.go +++ /dev/null @@ -1,83 +0,0 @@ -package api - -import ( - "context" - "io" - - "github.com/hashicorp/yamux" - "storj.io/drpc" - "storj.io/drpc/drpcconn" -) - -var _ drpc.Conn = &muxConn{} - -// muxConn implements drpc.Conn using the yamux -// multiplexer to allow concurrent RPCs -type muxConn struct { - conn io.ReadWriteCloser - sess *yamux.Session - closed chan struct{} -} - -func newMuxConn(conn io.ReadWriteCloser) (*muxConn, error) { - sess, err := yamux.Client(conn, nil) - if err != nil { - return nil, err - } - - return &muxConn{ - conn: conn, - sess: sess, - closed: make(chan struct{}), - }, nil -} - -func (m *muxConn) Close() error { - defer close(m.closed) - - err := m.sess.Close() - if err != nil { - return err - } - return m.conn.Close() -} - -func (m *muxConn) Closed() <-chan struct{} { - return m.closed -} - -func (m *muxConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { - conn, err := m.sess.Open() - if err != nil { - return err - } - defer conn.Close() - dconn := drpcconn.New(conn) - defer dconn.Close() - return dconn.Invoke(ctx, rpc, enc, in, out) -} - -func (m *muxConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { - conn, err := m.sess.Open() - if err != nil { - return nil, err - } - dconn := drpcconn.New(conn) - - go func() { - <-dconn.Closed() - conn.Close() - }() - - s, err := dconn.NewStream(ctx, rpc, enc) - if err != nil { - return nil, err - } - - go func() { - <-s.Context().Done() - dconn.Close() - }() - - return s, nil -} diff --git a/go.mod b/go.mod index d3f109a..a3547fe 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/mozillazg/go-pinyin v0.19.0 github.com/rs/zerolog v1.28.0 github.com/urfave/cli/v2 v2.23.7 + go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a golang.org/x/text v0.5.0 google.golang.org/protobuf v1.28.1 diff --git a/go.sum b/go.sum index 1592110..f9ad9ff 100644 --- a/go.sum +++ b/go.sum @@ -527,6 +527,10 @@ github.com/yuin/goldmark v1.5.3/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5ta github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs= github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +go.arsenm.dev/drpc v0.0.0-20230103205749-5dc01e6d22c7 h1:YJAOATYk33JWJyeZ8C/wu5WzxPCRMqIDLiR1OMn3hyg= +go.arsenm.dev/drpc v0.0.0-20230103205749-5dc01e6d22c7/go.mod h1:K5cFls42m5q1RIphTVojRdXLaoCknq/kBqQt8Ow3XuA= +go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d h1:LKeP4O5WwJus3YvfVILSok/tqX5CM9Qm2I7BvU9DYrE= +go.arsenm.dev/drpc v0.0.0-20230103210137-7219da760f6d/go.mod h1:K5cFls42m5q1RIphTVojRdXLaoCknq/kBqQt8Ow3XuA= go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a h1:4XmKIO0udIwVS9wqoi4hq76y3+X0MeV/GRMo2/01xSU= go.arsenm.dev/infinitime v0.0.0-20221122225335-2da80044b34a/go.mod h1:K3NJ6fyPv5qqHUedB3MccKOE0whJMJZ80l/yTzzTrgc= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= diff --git a/socket.go b/socket.go index dcfe7f4..91dc7b4 100644 --- a/socket.go +++ b/socket.go @@ -27,13 +27,12 @@ import ( "path/filepath" "time" - "github.com/hashicorp/yamux" "github.com/rs/zerolog/log" + "go.arsenm.dev/drpc/muxserver" "go.arsenm.dev/infinitime" "go.arsenm.dev/infinitime/blefs" "go.arsenm.dev/itd/internal/rpc" "storj.io/drpc/drpcmux" - "storj.io/drpc/drpcserver" ) var ( @@ -80,34 +79,7 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error { return err } - srv := drpcserver.New(mux) - - go func() { - for { - conn, err := ln.Accept() - if err != nil { - log.Fatal().Err(err).Msg("Error accepting connection") - } - - sess, err := yamux.Server(conn, nil) - if err != nil { - log.Fatal().Err(err).Msg("Error creating multiplexed session") - } - - go func() { - for { - conn, err := sess.Accept() - if errors.Is(err, io.EOF) { - break - } else if err != nil { - log.Fatal().Err(err).Msg("Error accepting stream") - } - - go srv.ServeOne(ctx, conn) - } - }() - } - }() + go muxserver.New(mux).Serve(ctx, ln) // Log socket start log.Info().Str("path", k.String("socket.path")).Msg("Started control socket")