Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] Memory Sort to close the goroutines when callback returns error #8040

Merged
merged 1 commit into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (ms *MergeSort) StreamExecute(vcursor VCursor, bindVars map[string]*querypb

handles := make([]*streamHandle, len(ms.Primitives))
for i, input := range ms.Primitives {
handles[i] = runOneStream(vcursor, input, bindVars, wantfields)
handles[i] = runOneStream(ctx, vcursor, input, bindVars, wantfields)
// Need fields only from first handle, if wantfields was true.
wantfields = false
}
Expand Down Expand Up @@ -183,12 +183,11 @@ type streamHandle struct {
}

// runOnestream starts a streaming query on one shard, and returns a streamHandle for it.
func runOneStream(vcursor VCursor, input StreamExecutor, bindVars map[string]*querypb.BindVariable, wantfields bool) *streamHandle {
func runOneStream(ctx context.Context, vcursor VCursor, input StreamExecutor, bindVars map[string]*querypb.BindVariable, wantfields bool) *streamHandle {
handle := &streamHandle{
fields: make(chan []*querypb.Field, 1),
row: make(chan []sqltypes.Value, 10),
}
ctx := vcursor.Context()

go func() {
defer close(handle.fields)
Expand Down
36 changes: 36 additions & 0 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package vtgate
import (
"fmt"
"reflect"
"runtime"
"strings"
"testing"
"time"

"vitess.io/vitess/go/test/utils"

Expand Down Expand Up @@ -2420,3 +2422,37 @@ func TestSelectFromInformationSchema(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, sbc1.StringQueries(), []string{"select * from INFORMATION_SCHEMA.`TABLES` where TABLE_SCHEMA = :__vtschemaname"})
}

func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
// Special setup: Don't use createLegacyExecutorEnv.
cell := "aa"
hc := discovery.NewFakeHealthCheck()
s := createSandbox("TestExecutor")
s.VSchema = executorVSchema
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
serv := new(sandboxTopo)
resolver := newTestResolver(hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
count := 1
for _, shard := range shards {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
sbc.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"), fmt.Sprintf("%d|%d", count, count)),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"), fmt.Sprintf("%d|%d", count+10, count)),
})
count++
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, testCacheSize)
before := runtime.NumGoroutine()

query := "select id, col from user order by id limit 2"
gotResult, err := executorStream(executor, query)
require.NoError(t, err)

wantResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields("id|col", "int32|int32"), "1|1", "2|2")
wantResult.RowsAffected = 0
utils.MustMatch(t, wantResult, gotResult)
// some sleep to close all goroutines.
time.Sleep(100 * time.Millisecond)
assert.GreaterOrEqual(t, before, runtime.NumGoroutine(), "left open goroutines lingering")
}
18 changes: 16 additions & 2 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,24 @@ func (sbc *SandboxConn) StreamExecute(ctx context.Context, target *querypb.Targe
sbc.sExecMu.Unlock()
return err
}
nextRs := sbc.getNextResult()
if sbc.results == nil {
nextRs := sbc.getNextResult()
sbc.sExecMu.Unlock()
return callback(nextRs)
}

for len(sbc.results) > 0 {
nextRs := sbc.getNextResult()
sbc.sExecMu.Unlock()
err := callback(nextRs)
if err != nil {
return err
}
sbc.sExecMu.Lock()
}
sbc.sExecMu.Unlock()

return callback(nextRs)
return nil
}

// Begin is part of the QueryService interface.
Expand Down