Handle requests concurrently

This commit is contained in:
2022-08-06 22:52:58 -07:00
parent d35a16ec64
commit 7592eae318
2 changed files with 72 additions and 70 deletions

View File

@@ -122,7 +122,7 @@ func TestCodecs(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("codec/%s: %v", name, err) t.Errorf("codec/%s: %v", name, err)
} }
if add != 4 { if add != 4 {
t.Errorf("codec/%s: add: expected 4, got %d", name, add) t.Errorf("codec/%s: add: expected 4, got %d", name, add)
} }

View File

@@ -130,9 +130,8 @@ func (s *Server) execute(pCtx context.Context, typ string, name string, data []b
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
arg = argVal.Elem().Interface() arg = argVal.Elem().Interface()
ctx = newContext(pCtx, c) ctx = newContext(pCtx, c)
// Get reflect value of context // Get reflect value of context
@@ -304,87 +303,90 @@ func (s *Server) handleConn(pCtx context.Context, c codec.Codec) {
continue continue
} }
// Execute decoded call go func() {
val, ctx, err := s.execute( // Execute decoded call
pCtx, val, ctx, err := s.execute(
call.Receiver, pCtx,
call.Method, call.Receiver,
call.Arg, call.Method,
c, call.Arg,
) c,
if err != nil { )
s.sendErr(c, call, val, err)
} else {
valData, err := c.Marshal(val)
if err != nil { if err != nil {
s.sendErr(c, call, val, err) s.sendErr(c, call, val, err)
continue } else {
} valData, err := c.Marshal(val)
// Create response
res := types.Response{
ID: call.ID,
Return: valData,
}
// If function has created a channel
if ctx.isChannel {
idData, err := c.Marshal(ctx.channelID)
if err != nil { if err != nil {
s.sendErr(c, call, val, err) s.sendErr(c, call, val, err)
continue return
} }
// Set IsChannel to true // Create response
res.Type = types.ResponseTypeChannel res := types.Response{
// Overwrite return value with channel ID ID: call.ID,
res.Return = idData Return: valData,
}
// Store context in map for future use // If function has created a channel
s.contextsMtx.Lock() if ctx.isChannel {
s.contexts[ctx.channelID] = ctx idData, err := c.Marshal(ctx.channelID)
s.contextsMtx.Unlock() if err != nil {
s.sendErr(c, call, val, err)
go func() { return
// For every value received from channel
for val := range ctx.channel {
codecMtx.Lock()
valData, err := c.Marshal(val)
if err != nil {
continue
}
// Encode response using codec
c.Encode(types.Response{
ID: ctx.channelID,
Return: valData,
})
codecMtx.Unlock()
} }
// Cancel context // Set IsChannel to true
ctx.cancel() res.Type = types.ResponseTypeChannel
// Delete context from map // Overwrite return value with channel ID
res.Return = idData
// Store context in map for future use
s.contextsMtx.Lock() s.contextsMtx.Lock()
delete(s.contexts, ctx.channelID) s.contexts[ctx.channelID] = ctx
s.contextsMtx.Unlock() s.contextsMtx.Unlock()
codecMtx.Lock() go func() {
c.Encode(types.Response{ // For every value received from channel
Type: types.ResponseTypeChannelDone, for val := range ctx.channel {
ID: ctx.channelID, codecMtx.Lock()
})
codecMtx.Unlock() valData, err := c.Marshal(val)
}() if err != nil {
continue
}
// Encode response using codec
c.Encode(types.Response{
ID: ctx.channelID,
Return: valData,
})
codecMtx.Unlock()
}
// Cancel context
ctx.cancel()
// Delete context from map
s.contextsMtx.Lock()
delete(s.contexts, ctx.channelID)
s.contextsMtx.Unlock()
codecMtx.Lock()
c.Encode(types.Response{
Type: types.ResponseTypeChannelDone,
ID: ctx.channelID,
})
codecMtx.Unlock()
}()
}
// Encode response using codec
codecMtx.Lock()
c.Encode(res)
codecMtx.Unlock()
} }
// Encode response using codec }()
codecMtx.Lock()
c.Encode(res)
codecMtx.Unlock()
}
} }
} }