forked from Elara6331/itd
		
	Add connection multiplexing, fixing itgui
This commit is contained in:
		
							
								
								
									
										13
									
								
								api/api.go
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								api/api.go
									
									
									
									
									
								
							| @@ -5,13 +5,14 @@ import ( | |||||||
| 	"net" | 	"net" | ||||||
|  |  | ||||||
| 	"go.arsenm.dev/itd/internal/rpc" | 	"go.arsenm.dev/itd/internal/rpc" | ||||||
|  | 	"storj.io/drpc" | ||||||
| 	"storj.io/drpc/drpcconn" | 	"storj.io/drpc/drpcconn" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const DefaultAddr = "/tmp/itd/socket" | const DefaultAddr = "/tmp/itd/socket" | ||||||
|  |  | ||||||
| type Client struct { | type Client struct { | ||||||
| 	conn   *drpcconn.Conn | 	conn   drpc.Conn | ||||||
| 	client rpc.DRPCITDClient | 	client rpc.DRPCITDClient | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -20,11 +21,15 @@ func New(sockPath string) (*Client, error) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	dconn := drpcconn.New(conn) |  | ||||||
|  | 	mconn, err := newMuxConn(conn) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return &Client{ | 	return &Client{ | ||||||
| 		conn:   dconn, | 		conn:   mconn, | ||||||
| 		client: rpc.NewDRPCITDClient(dconn), | 		client: rpc.NewDRPCITDClient(mconn), | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										65
									
								
								api/drpc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								api/drpc.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,65 @@ | |||||||
|  | package api | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"io" | ||||||
|  |  | ||||||
|  | 	"github.com/hashicorp/yamux" | ||||||
|  | 	"storj.io/drpc" | ||||||
|  | 	"storj.io/drpc/drpcconn" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | var _ drpc.Conn = &muxConn{} | ||||||
|  |  | ||||||
|  | type muxConn struct { | ||||||
|  | 	conn   io.ReadWriteCloser | ||||||
|  | 	sess   *yamux.Session | ||||||
|  | 	closed chan struct{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newMuxConn(conn io.ReadWriteCloser) (*muxConn, error) { | ||||||
|  | 	sess, err := yamux.Client(conn, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &muxConn{ | ||||||
|  | 		conn:   conn, | ||||||
|  | 		sess:   sess, | ||||||
|  | 		closed: make(chan struct{}), | ||||||
|  | 	}, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *muxConn) Close() error { | ||||||
|  | 	defer close(m.closed) | ||||||
|  |  | ||||||
|  | 	err := m.sess.Close() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return m.conn.Close() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *muxConn) Closed() <-chan struct{} { | ||||||
|  | 	return m.closed | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *muxConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { | ||||||
|  | 	conn, err := m.sess.Open() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer conn.Close() | ||||||
|  | 	dconn := drpcconn.New(conn) | ||||||
|  | 	return dconn.Invoke(ctx, rpc, enc, in, out) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *muxConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { | ||||||
|  | 	conn, err := m.sess.Open() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	dconn := drpcconn.New(conn) | ||||||
|  | 	return dconn.NewStream(ctx, rpc, enc) | ||||||
|  | } | ||||||
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							| @@ -10,6 +10,7 @@ require ( | |||||||
| 	github.com/cheggaaa/pb/v3 v3.1.0 | 	github.com/cheggaaa/pb/v3 v3.1.0 | ||||||
| 	github.com/gen2brain/dlgs v0.0.0-20220603100644-40c77870fa8d | 	github.com/gen2brain/dlgs v0.0.0-20220603100644-40c77870fa8d | ||||||
| 	github.com/godbus/dbus/v5 v5.1.0 | 	github.com/godbus/dbus/v5 v5.1.0 | ||||||
|  | 	github.com/hashicorp/yamux v0.1.1 | ||||||
| 	github.com/knadh/koanf v1.4.4 | 	github.com/knadh/koanf v1.4.4 | ||||||
| 	github.com/mattn/go-isatty v0.0.17 | 	github.com/mattn/go-isatty v0.0.17 | ||||||
| 	github.com/mozillazg/go-pinyin v0.19.0 | 	github.com/mozillazg/go-pinyin v0.19.0 | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @@ -297,6 +297,8 @@ github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoI | |||||||
| github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= | github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= | ||||||
| github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= | github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= | ||||||
| github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= | github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= | ||||||
|  | github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= | ||||||
|  | github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= | ||||||
| github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= | github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= | ||||||
| github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= | github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= | ||||||
| github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= | github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= | ||||||
|   | |||||||
							
								
								
									
										26
									
								
								socket.go
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								socket.go
									
									
									
									
									
								
							| @@ -27,6 +27,7 @@ import ( | |||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/hashicorp/yamux" | ||||||
| 	"github.com/rs/zerolog/log" | 	"github.com/rs/zerolog/log" | ||||||
| 	"go.arsenm.dev/infinitime" | 	"go.arsenm.dev/infinitime" | ||||||
| 	"go.arsenm.dev/infinitime/blefs" | 	"go.arsenm.dev/infinitime/blefs" | ||||||
| @@ -79,7 +80,30 @@ func startSocket(ctx context.Context, dev *infinitime.Device) error { | |||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	go drpcserver.New(mux).Serve(ctx, ln) | 	srv := drpcserver.New(mux) | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			conn, err := ln.Accept() | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Fatal().Err(err).Msg("Error accepting connection") | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			sess, err := yamux.Server(conn, nil) | ||||||
|  | 			if err != nil { | ||||||
|  | 				log.Fatal().Err(err).Msg("Error creating multiplexed session") | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			for { | ||||||
|  | 				conn, err := sess.Accept() | ||||||
|  | 				if err != nil { | ||||||
|  | 					log.Fatal().Err(err).Msg("Error accepting stream") | ||||||
|  | 				} | ||||||
|  |  | ||||||
|  | 				go srv.ServeOne(ctx, conn) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	// Log socket start | 	// Log socket start | ||||||
| 	log.Info().Str("path", k.String("socket.path")).Msg("Started control socket") | 	log.Info().Str("path", k.String("socket.path")).Msg("Started control socket") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user