Initial Commit
This commit is contained in:
commit
b0c20e01dc
10
go.mod
Normal file
10
go.mod
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
module go.arsenm.dev/drpc
|
||||||
|
|
||||||
|
go 1.18
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/hashicorp/yamux v0.1.1
|
||||||
|
storj.io/drpc v0.0.32
|
||||||
|
)
|
||||||
|
|
||||||
|
require github.com/zeebo/errs v1.2.2 // indirect
|
7
go.sum
Normal file
7
go.sum
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE=
|
||||||
|
github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
|
||||||
|
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||||
|
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
|
||||||
|
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
|
||||||
|
storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI=
|
||||||
|
storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg=
|
90
muxconn/muxconn.go
Normal file
90
muxconn/muxconn.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
"storj.io/drpc"
|
||||||
|
"storj.io/drpc/drpcconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ drpc.Conn = &Conn{}
|
||||||
|
|
||||||
|
// Conn implements drpc.Conn using the yamux
|
||||||
|
// multiplexer to allow concurrent RPCs
|
||||||
|
type Conn struct {
|
||||||
|
conn io.ReadWriteCloser
|
||||||
|
sess *yamux.Session
|
||||||
|
closed chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a new multiplexed DRPC connection
|
||||||
|
func New(conn io.ReadWriteCloser) (*Conn, error) {
|
||||||
|
sess, err := yamux.Client(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Conn{
|
||||||
|
conn: conn,
|
||||||
|
sess: sess,
|
||||||
|
closed: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the multiplexer session
|
||||||
|
// and the underlying connection.
|
||||||
|
func (m *Conn) Close() error {
|
||||||
|
defer close(m.closed)
|
||||||
|
|
||||||
|
err := m.sess.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return m.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Closed returns a channel that will be closed
|
||||||
|
// when the connection is closed
|
||||||
|
func (m *Conn) Closed() <-chan struct{} {
|
||||||
|
return m.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invoke issues the rpc on the transport serializing in, waits for a response, and deserializes it into out.
|
||||||
|
func (m *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error {
|
||||||
|
conn, err := m.sess.Open()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
dconn := drpcconn.New(conn)
|
||||||
|
defer dconn.Close()
|
||||||
|
return dconn.Invoke(ctx, rpc, enc, in, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStream begins a streaming rpc on the connection.
|
||||||
|
func (m *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
|
||||||
|
conn, err := m.sess.Open()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
dconn := drpcconn.New(conn)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-dconn.Closed()
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
s, err := dconn.NewStream(ctx, rpc, enc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-s.Context().Done()
|
||||||
|
dconn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return s, nil
|
||||||
|
}
|
64
muxserver/muxserver.go
Normal file
64
muxserver/muxserver.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
package muxserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
"storj.io/drpc"
|
||||||
|
"storj.io/drpc/drpcserver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Server is a DRPC server that handles
|
||||||
|
// multiplexed streams
|
||||||
|
type Server struct {
|
||||||
|
srv *drpcserver.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a new multiplexed Server that serves handler
|
||||||
|
func New(handler drpc.Handler) *Server {
|
||||||
|
return &Server{srv: drpcserver.New(handler)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWithOptions is the same as New but passes options to DRPC
|
||||||
|
func NewWithOptions(handler drpc.Handler, opts drpcserver.Options) *Server {
|
||||||
|
return &Server{srv: drpcserver.NewWithOptions(handler, opts)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serve listens on the given listener and handles all multiplexed
|
||||||
|
// streams.
|
||||||
|
func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
|
||||||
|
for {
|
||||||
|
conn, err := ln.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sess, err := yamux.Server(conn, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.handleSession(ctx, sess)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleSession(ctx context.Context, sess *yamux.Session) {
|
||||||
|
for {
|
||||||
|
conn, err := sess.Accept()
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.srv.ServeOne(ctx, conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeOne serves a single set of rpcs on the provided transport.
|
||||||
|
func (s *Server) ServeOne(ctx context.Context, conn io.ReadWriteCloser) error {
|
||||||
|
return s.srv.ServeOne(ctx, conn)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user