2023-01-03 21:01:37 +00:00
|
|
|
package muxconn
|
2023-01-03 20:56:09 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-01-04 22:12:10 +00:00
|
|
|
"errors"
|
2023-01-03 20:56:09 +00:00
|
|
|
"io"
|
|
|
|
|
|
|
|
"github.com/hashicorp/yamux"
|
|
|
|
"storj.io/drpc"
|
|
|
|
"storj.io/drpc/drpcconn"
|
|
|
|
)
|
|
|
|
|
2023-01-04 22:12:10 +00:00
|
|
|
var ErrClosed = errors.New("connection closed")
|
|
|
|
|
2023-01-03 20:56:09 +00:00
|
|
|
var _ drpc.Conn = &Conn{}
|
|
|
|
|
|
|
|
// Conn implements drpc.Conn using the yamux
|
|
|
|
// multiplexer to allow concurrent RPCs
|
|
|
|
type Conn struct {
|
2023-01-04 22:08:18 +00:00
|
|
|
conn io.ReadWriteCloser
|
|
|
|
sess *yamux.Session
|
|
|
|
isClosed bool
|
|
|
|
closed chan struct{}
|
2023-01-03 20:56:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// New returns a new multiplexed DRPC connection
|
|
|
|
func New(conn io.ReadWriteCloser) (*Conn, error) {
|
|
|
|
sess, err := yamux.Client(conn, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &Conn{
|
|
|
|
conn: conn,
|
|
|
|
sess: sess,
|
|
|
|
closed: make(chan struct{}),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the multiplexer session
|
|
|
|
// and the underlying connection.
|
|
|
|
func (m *Conn) Close() error {
|
2023-01-04 22:08:18 +00:00
|
|
|
if !m.isClosed {
|
|
|
|
m.isClosed = true
|
|
|
|
defer close(m.closed)
|
2023-01-03 20:56:09 +00:00
|
|
|
|
2023-01-04 22:08:18 +00:00
|
|
|
err := m.sess.Close()
|
|
|
|
if err != nil {
|
|
|
|
m.conn.Close()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return m.conn.Close()
|
2023-01-03 20:56:09 +00:00
|
|
|
}
|
2023-01-04 22:08:18 +00:00
|
|
|
return nil
|
2023-01-03 20:56:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Closed returns a channel that will be closed
|
|
|
|
// when the connection is closed
|
|
|
|
func (m *Conn) Closed() <-chan struct{} {
|
|
|
|
return m.closed
|
|
|
|
}
|
|
|
|
|
|
|
|
// Invoke issues the rpc on the transport serializing in, waits for a response, and deserializes it into out.
|
|
|
|
func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
2023-01-04 22:12:10 +00:00
|
|
|
if m.isClosed {
|
|
|
|
return ErrClosed
|
|
|
|
}
|
|
|
|
|
2023-01-03 20:56:09 +00:00
|
|
|
conn, err := m.sess.Open()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
dconn := drpcconn.New(conn)
|
|
|
|
defer dconn.Close()
|
|
|
|
return dconn.Invoke(ctx, rpc, enc, in, out)
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewStream begins a streaming rpc on the connection.
|
|
|
|
func (m *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
2023-01-04 22:12:10 +00:00
|
|
|
if m.isClosed {
|
|
|
|
return nil, ErrClosed
|
|
|
|
}
|
|
|
|
|
2023-01-03 20:56:09 +00:00
|
|
|
conn, err := m.sess.Open()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
dconn := drpcconn.New(conn)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
<-dconn.Closed()
|
|
|
|
conn.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
s, err := dconn.NewStream(ctx, rpc, enc)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
<-s.Context().Done()
|
|
|
|
dconn.Close()
|
|
|
|
}()
|
|
|
|
|
|
|
|
return s, nil
|
|
|
|
}
|