Skip to content

Commit

Permalink
simplify StreamExecute
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Nov 10, 2020
1 parent 8e845a7 commit 298edc9
Showing 1 changed file with 36 additions and 43 deletions.
79 changes: 36 additions & 43 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,56 +130,49 @@ func (c *Concatenate) StreamExecute(vcursor VCursor, bindVars map[string]*queryp

g := vcursor.ErrorGroupCancellableContext()
fieldset.Add(1)
var mu sync.Mutex
var cbMu, visitFieldsMu sync.Mutex

visitFields := func(fields []*querypb.Field) (unlocked bool, err error) {
visitFieldsMu.Lock()
defer visitFieldsMu.Unlock()
if seenFields == nil {
seenFields = fields
return true, nil
}
return false, compareFields(fields, seenFields)
}

fieldsSent := false
g.Go(func() error {
err := c.LHS.StreamExecute(vcursor, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
if !fieldsSent {
defer fieldset.Done()
seenFields = resultChunk.Fields
fieldsSent = true
// No other call can happen before this call.
return callback(resultChunk)
visitor := func(resultChunk *sqltypes.Result) error {
if resultChunk.Fields != nil {
var err error
unlocked, err := visitFields(resultChunk.Fields)
if err != nil {
return err
}
// This to ensure only one send happens back to the client.
mu.Lock()
defer mu.Unlock()
select {
case <-vcursor.Context().Done():
return nil
default:
if unlocked {
defer fieldset.Done()
// nothing else will be sent to the client until we send out this first one
return callback(resultChunk)
}
})
// This is to ensure other streams complete if the first stream failed to unlock the wait.
if !fieldsSent {
fieldset.Done()
}
return err
fieldset.Wait()

// This to ensure only one send happens back to the client.
cbMu.Lock()
defer cbMu.Unlock()

select {
case <-vcursor.Context().Done():
return nil
default:
return callback(resultChunk)
}
}
g.Go(func() error {
return c.LHS.StreamExecute(vcursor, bindVars, wantfields, visitor)
})
g.Go(func() error {
err := c.RHS.StreamExecute(vcursor, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
fieldset.Wait()

if resultChunk.Fields != nil {
err := compareFields(seenFields, resultChunk.Fields)
if err != nil {
return err
}
}

// This to ensure only one send happens back to the client.
mu.Lock()
defer mu.Unlock()
select {
case <-vcursor.Context().Done():
return nil
default:
return callback(resultChunk)
}
})
return err
return c.RHS.StreamExecute(vcursor, bindVars, wantfields, visitor)
})
if err := g.Wait(); err != nil {
return err
Expand Down

0 comments on commit 298edc9

Please sign in to comment.