Compare commits

..

No commits in common. "f1aa0f5c4f8fd004daad7624dbff9f1dd977ad56" and "6df8cf53c6e66183ecbfeb995256b685788b7c9f" have entirely different histories.

2 changed files with 26 additions and 24 deletions

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"context"
"errors" "errors"
"net" "net"
"reflect" "reflect"
@ -47,7 +46,7 @@ func New(conn net.Conn, cf codec.CodecFunc) *Client {
} }
// Call calls a method on the server // Call calls a method on the server
func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{}, ret interface{}) error { func (c *Client) Call(rcvr, method string, arg interface{}, ret interface{}) error {
// Create new v4 UUOD // Create new v4 UUOD
id, err := uuid.NewV4() id, err := uuid.NewV4()
if err != nil { if err != nil {
@ -55,8 +54,6 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
} }
idStr := id.String() idStr := id.String()
ctxDoneVal := reflect.ValueOf(ctx.Done())
// Create new channel using the generated ID // Create new channel using the generated ID
c.chMtx.Lock() c.chMtx.Lock()
c.chs[idStr] = make(chan *types.Response, 1) c.chs[idStr] = make(chan *types.Response, 1)
@ -109,12 +106,12 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
c.chs[chID] = make(chan *types.Response, 5) c.chs[chID] = make(chan *types.Response, 5)
c.chMtx.Unlock() c.chMtx.Unlock()
channelClosed := false
go func() { go func() {
// Get type of channel elements // Get type of channel elements
chElemType := retVal.Type().Elem() chElemType := retVal.Type().Elem()
// For every value received from channel // For every value received from channel
for val := range c.chs[chID] { for val := range c.chs[chID] {
//s := time.Now()
if val.ChannelDone { if val.ChannelDone {
// Close and delete channel // Close and delete channel
c.chMtx.Lock() c.chMtx.Lock()
@ -124,6 +121,9 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
// Close return channel // Close return channel
retVal.Close() retVal.Close()
channelClosed = true
break break
} }
// Get reflect value from channel response // Get reflect value from channel response
@ -139,20 +139,25 @@ func (c *Client) Call(ctx context.Context, rcvr, method string, arg interface{},
rVal = newVal rVal = newVal
} }
chosen, _, _ := reflect.Select([]reflect.SelectCase{ // Send value to channel
{Dir: reflect.SelectSend, Chan: retVal, Send: rVal}, retVal.Send(rVal)
{Dir: reflect.SelectRecv, Chan: ctxDoneVal, Send: reflect.Value{}}, }
}) }()
if chosen == 1 {
c.Call(context.Background(), "lrpc", "ChannelDone", chID, nil) go func() {
for {
val, ok := retVal.Recv()
if !ok && val.IsValid() {
break
}
}
if !channelClosed {
c.Call("lrpc", "ChannelDone", id, nil)
// Close and delete channel // Close and delete channel
c.chMtx.Lock() c.chMtx.Lock()
close(c.chs[chID]) close(c.chs[chID])
delete(c.chs, chID) delete(c.chs, chID)
c.chMtx.Unlock() c.chMtx.Unlock()
retVal.Close()
}
} }
}() }()
} else { } else {

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"net" "net"
@ -13,23 +12,21 @@ import (
func main() { func main() {
gob.Register([2]int{}) gob.Register([2]int{})
ctx := context.Background()
conn, _ := net.Dial("tcp", "localhost:9090") conn, _ := net.Dial("tcp", "localhost:9090")
c := client.New(conn, codec.Gob) c := client.New(conn, codec.Gob)
defer c.Close() defer c.Close()
var add int var add int
c.Call(ctx, "Arith", "Add", [2]int{5, 5}, &add) c.Call("Arith", "Add", [2]int{5, 5}, &add)
var sub int var sub int
c.Call(ctx, "Arith", "Sub", [2]int{5, 5}, &sub) c.Call("Arith", "Sub", [2]int{5, 5}, &sub)
var mul int var mul int
c.Call(ctx, "Arith", "Mul", [2]int{5, 5}, &mul) c.Call("Arith", "Mul", [2]int{5, 5}, &mul)
var div int var div int
c.Call(ctx, "Arith", "Div", [2]int{5, 5}, &div) c.Call("Arith", "Div", [2]int{5, 5}, &div)
fmt.Printf( fmt.Printf(
"add: %d, sub: %d, mul: %d, div: %d\n", "add: %d, sub: %d, mul: %d, div: %d\n",