Compare commits

..

No commits in common. "f1aa0f5c4f8fd004daad7624dbff9f1dd977ad56" and "6df8cf53c6e66183ecbfeb995256b685788b7c9f" have entirely different histories.

2 changed files with 26 additions and 24 deletions

View File

@ -1,7 +1,6 @@
package client
import (
"context"
"errors"
"net"
"reflect"
@ -47,7 +46,7 @@ func New(conn net.Conn, cf codec.CodecFunc) *Client {
}
// Call calls a method on the server
func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{}, ret interface{}) error {
func (c *Client) Call(rcvr, method string, arg interface{}, ret interface{}) error {
// Create new v4 UUOD
id, err := uuid.NewV4()
if err != nil {
@ -55,8 +54,6 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
}
idStr := id.String()
ctxDoneVal := reflect.ValueOf(ctx.Done())
// Create new channel using the generated ID
c.chMtx.Lock()
c.chs[idStr] = make(chan *types.Response, 1)
@ -109,12 +106,12 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
c.chs[chID] = make(chan *types.Response, 5)
c.chMtx.Unlock()
channelClosed := false
go func() {
// Get type of channel elements
chElemType := retVal.Type().Elem()
// For every value received from channel
for val := range c.chs[chID] {
//s := time.Now()
if val.ChannelDone {
// Close and delete channel
c.chMtx.Lock()
@ -124,6 +121,9 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
// Close return channel
retVal.Close()
channelClosed = true
break
}
// Get reflect value from channel response
@ -139,20 +139,25 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
rVal = newVal
}
chosen, _, _ := reflect.Select([]reflect.SelectCase{
{Dir: reflect.SelectSend, Chan: retVal, Send: rVal},
{Dir: reflect.SelectRecv, Chan: ctxDoneVal, Send: reflect.Value{}},
})
if chosen == 1 {
c.Call(context.Background(), "lrpc", "ChannelDone", chID, nil)
// Send value to channel
retVal.Send(rVal)
}
}()
go func() {
for {
val, ok := retVal.Recv()
if !ok && val.IsValid() {
break
}
}
if !channelClosed {
c.Call("lrpc", "ChannelDone", id, nil)
// Close and delete channel
c.chMtx.Lock()
close(c.chs[chID])
delete(c.chs, chID)
c.chMtx.Unlock()
retVal.Close()
}
}
}()
} else {

View File

@ -1,7 +1,6 @@
package main
import (
"context"
"encoding/gob"
"fmt"
"net"
@ -13,23 +12,21 @@ import (
func main() {
gob.Register([2]int{})
ctx := context.Background()
conn, _ := net.Dial("tcp", "localhost:9090")
c := client.New(conn, codec.Gob)
defer c.Close()
var add int
c.Call(ctx, "Arith", "Add", [2]int{5, 5}, &add)
c.Call("Arith", "Add", [2]int{5, 5}, &add)
var sub int
c.Call(ctx, "Arith", "Sub", [2]int{5, 5}, &sub)
c.Call("Arith", "Sub", [2]int{5, 5}, &sub)
var mul int
c.Call(ctx, "Arith", "Mul", [2]int{5, 5}, &mul)
c.Call("Arith", "Mul", [2]int{5, 5}, &mul)
var div int
c.Call(ctx, "Arith", "Div", [2]int{5, 5}, &div)
c.Call("Arith", "Div", [2]int{5, 5}, &div)
fmt.Printf(
"add: %d, sub: %d, mul: %d, div: %d\n",