diff --git a/muxconn/muxconn.go b/muxconn/muxconn.go index d42c9f1..adc9901 100644 --- a/muxconn/muxconn.go +++ b/muxconn/muxconn.go @@ -2,6 +2,7 @@ package muxconn import ( "context" + "errors" "io" "github.com/hashicorp/yamux" @@ -9,6 +10,8 @@ import ( "storj.io/drpc/drpcconn" ) +var ErrClosed = errors.New("connection closed") + var _ drpc.Conn = &Conn{} // Conn implements drpc.Conn using the yamux @@ -59,6 +62,10 @@ func (m *Conn) Closed() <-chan struct{} { // Invoke issues the rpc on the transport serializing in, waits for a response, and deserializes it into out. func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { + if m.isClosed { + return ErrClosed + } + conn, err := m.sess.Open() if err != nil { return err @@ -71,6 +78,10 @@ func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou // NewStream begins a streaming rpc on the connection. func (m *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { + if m.isClosed { + return nil, ErrClosed + } + conn, err := m.sess.Open() if err != nil { return nil, err