From b0c20e01dce2e2e5d45305205bd90a63ca35afbe Mon Sep 17 00:00:00 2001 From: Elara Musayelyan Date: Tue, 3 Jan 2023 12:56:09 -0800 Subject: [PATCH] Initial Commit --- go.mod | 10 +++++ go.sum | 7 ++++ muxconn/muxconn.go | 90 ++++++++++++++++++++++++++++++++++++++++++ muxserver/muxserver.go | 64 ++++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 go.mod create mode 100644 go.sum create mode 100644 muxconn/muxconn.go create mode 100644 muxserver/muxserver.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..04538e0 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module go.arsenm.dev/drpc + +go 1.18 + +require ( + github.com/hashicorp/yamux v0.1.1 + storj.io/drpc v0.0.32 +) + +require github.com/zeebo/errs v1.2.2 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..f501af4 --- /dev/null +++ b/go.sum @@ -0,0 +1,7 @@ +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI= +storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg= diff --git a/muxconn/muxconn.go b/muxconn/muxconn.go new file mode 100644 index 0000000..11f88d7 --- /dev/null +++ b/muxconn/muxconn.go @@ -0,0 +1,90 @@ +package api + +import ( + "context" + "io" + + "github.com/hashicorp/yamux" + "storj.io/drpc" + "storj.io/drpc/drpcconn" +) + +var _ drpc.Conn = &Conn{} + +// Conn implements drpc.Conn using the yamux +// multiplexer to allow concurrent RPCs +type Conn struct { + conn io.ReadWriteCloser + sess *yamux.Session + closed chan struct{} +} + +// New returns a new multiplexed DRPC connection +func New(conn io.ReadWriteCloser) (*Conn, error) { + sess, err := yamux.Client(conn, nil) + if err != nil { + return nil, err + } + + return &Conn{ + conn: conn, + sess: sess, + closed: make(chan struct{}), + }, nil +} + +// Close closes the multiplexer session +// and the underlying connection. +func (m *Conn) Close() error { + defer close(m.closed) + + err := m.sess.Close() + if err != nil { + return err + } + return m.conn.Close() +} + +// Closed returns a channel that will be closed +// when the connection is closed +func (m *Conn) Closed() <-chan struct{} { + return m.closed +} + +// Invoke issues the rpc on the transport serializing in, waits for a response, and deserializes it into out. +func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { + conn, err := m.sess.Open() + if err != nil { + return err + } + defer conn.Close() + dconn := drpcconn.New(conn) + defer dconn.Close() + return dconn.Invoke(ctx, rpc, enc, in, out) +} + +// NewStream begins a streaming rpc on the connection. +func (m *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { + conn, err := m.sess.Open() + if err != nil { + return nil, err + } + dconn := drpcconn.New(conn) + + go func() { + <-dconn.Closed() + conn.Close() + }() + + s, err := dconn.NewStream(ctx, rpc, enc) + if err != nil { + return nil, err + } + + go func() { + <-s.Context().Done() + dconn.Close() + }() + + return s, nil +} diff --git a/muxserver/muxserver.go b/muxserver/muxserver.go new file mode 100644 index 0000000..d9a8c2d --- /dev/null +++ b/muxserver/muxserver.go @@ -0,0 +1,64 @@ +package muxserver + +import ( + "context" + "errors" + "io" + "net" + + "github.com/hashicorp/yamux" + "storj.io/drpc" + "storj.io/drpc/drpcserver" +) + +// Server is a DRPC server that handles +// multiplexed streams +type Server struct { + srv *drpcserver.Server +} + +// New returns a new multiplexed Server that serves handler +func New(handler drpc.Handler) *Server { + return &Server{srv: drpcserver.New(handler)} +} + +// NewWithOptions is the same as New but passes options to DRPC +func NewWithOptions(handler drpc.Handler, opts drpcserver.Options) *Server { + return &Server{srv: drpcserver.NewWithOptions(handler, opts)} +} + +// Serve listens on the given listener and handles all multiplexed +// streams. +func (s *Server) Serve(ctx context.Context, ln net.Listener) error { + for { + conn, err := ln.Accept() + if err != nil { + return err + } + + sess, err := yamux.Server(conn, nil) + if err != nil { + return err + } + + go s.handleSession(ctx, sess) + } +} + +func (s *Server) handleSession(ctx context.Context, sess *yamux.Session) { + for { + conn, err := sess.Accept() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + continue + } + + go s.srv.ServeOne(ctx, conn) + } +} + +// ServeOne serves a single set of rpcs on the provided transport. +func (s *Server) ServeOne(ctx context.Context, conn io.ReadWriteCloser) error { + return s.srv.ServeOne(ctx, conn) +}