drpc/muxconn/muxconn.go

108 lines
2.0 KiB
Go
Raw Normal View History

2023-01-03 21:01:37 +00:00
package muxconn
2023-01-03 20:56:09 +00:00
import (
"context"
2023-01-04 22:12:10 +00:00
"errors"
2023-01-03 20:56:09 +00:00
"io"
"github.com/hashicorp/yamux"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
)
2023-01-04 22:12:10 +00:00
var ErrClosed = errors.New("connection closed")
2023-01-03 20:56:09 +00:00
var _ drpc.Conn = &Conn{}
// Conn implements drpc.Conn using the yamux
// multiplexer to allow concurrent RPCs
type Conn struct {
conn io.ReadWriteCloser
sess *yamux.Session
isClosed bool
closed chan struct{}
2023-01-03 20:56:09 +00:00
}
// New returns a new multiplexed DRPC connection
func New(conn io.ReadWriteCloser) (*Conn, error) {
sess, err := yamux.Client(conn, nil)
if err != nil {
return nil, err
}
return &Conn{
conn: conn,
sess: sess,
closed: make(chan struct{}),
}, nil
}
// Close closes the multiplexer session
// and the underlying connection.
func (m *Conn) Close() error {
if !m.isClosed {
m.isClosed = true
defer close(m.closed)
2023-01-03 20:56:09 +00:00
err := m.sess.Close()
if err != nil {
m.conn.Close()
return err
}
return m.conn.Close()
2023-01-03 20:56:09 +00:00
}
return nil
2023-01-03 20:56:09 +00:00
}
// Closed returns a channel that will be closed
// when the connection is closed
func (m *Conn) Closed() <-chan struct{} {
return m.closed
}
// Invoke issues the rpc on the transport serializing in, waits for a response, and deserializes it into out.
func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
2023-01-04 22:12:10 +00:00
if m.isClosed {
return ErrClosed
}
2023-01-03 20:56:09 +00:00
conn, err := m.sess.Open()
if err != nil {
return err
}
defer conn.Close()
dconn := drpcconn.New(conn)
defer dconn.Close()
return dconn.Invoke(ctx, rpc, enc, in, out)
}
// NewStream begins a streaming rpc on the connection.
func (m *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
2023-01-04 22:12:10 +00:00
if m.isClosed {
return nil, ErrClosed
}
2023-01-03 20:56:09 +00:00
conn, err := m.sess.Open()
if err != nil {
return nil, err
}
dconn := drpcconn.New(conn)
go func() {
<-dconn.Closed()
conn.Close()
}()
s, err := dconn.NewStream(ctx, rpc, enc)
if err != nil {
return nil, err
}
go func() {
<-s.Context().Done()
dconn.Close()
}()
return s, nil
}