Move multiplexing code into separate module

This commit is contained in:
2023-01-03 13:06:38 -08:00
parent 053f8f50d7
commit 19a9f64525
5 changed files with 10 additions and 115 deletions

View File

@@ -4,6 +4,7 @@ import (
"io"
"net"
"go.arsenm.dev/drpc/muxconn"
"go.arsenm.dev/itd/internal/rpc"
"storj.io/drpc"
)
@@ -25,7 +26,7 @@ func New(sockPath string) (*Client, error) {
return nil, err
}
mconn, err := newMuxConn(conn)
mconn, err := muxconn.New(conn)
if err != nil {
return nil, err
}
@@ -39,7 +40,7 @@ func New(sockPath string) (*Client, error) {
// NewFromConn returns a client that communicates
// over the given connection.
func NewFromConn(conn io.ReadWriteCloser) (*Client, error) {
mconn, err := newMuxConn(conn)
mconn, err := muxconn.New(conn)
if err != nil {
return nil, err
}

View File

@@ -1,83 +0,0 @@
package api
import (
"context"
"io"
"github.com/hashicorp/yamux"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
)
var _ drpc.Conn = &muxConn{}
// muxConn implements drpc.Conn using the yamux
// multiplexer to allow concurrent RPCs
type muxConn struct {
conn io.ReadWriteCloser
sess *yamux.Session
closed chan struct{}
}
func newMuxConn(conn io.ReadWriteCloser) (*muxConn, error) {
sess, err := yamux.Client(conn, nil)
if err != nil {
return nil, err
}
return &muxConn{
conn: conn,
sess: sess,
closed: make(chan struct{}),
}, nil
}
func (m *muxConn) Close() error {
defer close(m.closed)
err := m.sess.Close()
if err != nil {
return err
}
return m.conn.Close()
}
func (m *muxConn) Closed() <-chan struct{} {
return m.closed
}
func (m *muxConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
conn, err := m.sess.Open()
if err != nil {
return err
}
defer conn.Close()
dconn := drpcconn.New(conn)
defer dconn.Close()
return dconn.Invoke(ctx, rpc, enc, in, out)
}
func (m *muxConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
conn, err := m.sess.Open()
if err != nil {
return nil, err
}
dconn := drpcconn.New(conn)
go func() {
<-dconn.Closed()
conn.Close()
}()
s, err := dconn.NewStream(ctx, rpc, enc)
if err != nil {
return nil, err
}
go func() {
<-s.Context().Done()
dconn.Close()
}()
return s, nil
}