Skip to content

Commit

Permalink
distsql,util: Add executor open close time into its total consume time (
Browse files Browse the repository at this point in the history
#56236)

ref #56232, close #56233
  • Loading branch information
yibin authored Nov 18, 2024
1 parent 06ac7b1 commit e731f1b
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 18 deletions.
13 changes: 5 additions & 8 deletions pkg/bindinfo/session_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -336,14 +337,10 @@ func TestIssue19836(t *testing.T) {
tk.MustExec("set @a=1;")
tk.MustExec("set @b=2;")
tk.MustExec("EXECUTE stmt USING @a, @b;")
explainResult := testkit.Rows(
"Limit_8 2.00 0 root time:0s, loops:0 offset:1, count:2 N/A N/A",
"└─TableReader_13 3.00 0 root time:0s, loops:0 data:Limit_12 N/A N/A",
" └─Limit_12 3.00 0 cop[tikv] offset:0, count:3 N/A N/A",
" └─Selection_11 3.00 0 cop[tikv] eq(test.t.a, 40) N/A N/A",
" └─TableFullScan_10 3000.00 0 cop[tikv] table:t keep order:false, stats:pseudo N/A N/A",
)
tk.MustQuery("explain for connection " + strconv.FormatUint(tk.Session().ShowProcess().ID, 10)).Check(explainResult)
result := tk.MustQuery("explain for connection " + strconv.FormatUint(tk.Session().ShowProcess().ID, 10)).String()
// Don't check the whole output here, since the explain for connection output contains execution time, which is not fixed
// after we including open/close time in it.
require.True(t, strings.Contains(result, "TableFullScan"))
}

func TestDropSingleBindings(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/executor/internal/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ func Open(ctx context.Context, e Executor) (err error) {
err = util.GetRecoverError(r)
}
}()
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().RecordOpen(time.Since(start)) }()
}
return e.Open(ctx)
}

Expand Down Expand Up @@ -469,5 +473,9 @@ func Close(e Executor) (err error) {
err = util.GetRecoverError(r)
}
}()
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().RecordClose(time.Since(start)) }()
}
return e.Close()
}
11 changes: 4 additions & 7 deletions pkg/expression/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,10 @@ func TestVectorConstantExplain(t *testing.T) {
require.NoError(t, err)
fmt.Println(planTree)
fmt.Println("++++")
require.Equal(t, strings.Join([]string{
` id task estRows operator info actRows execution info memory disk`,
` Projection_3 root 10000 vec_cosine_distance(test.t.c, cast([100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100...(len:401), vector))->Column#3 0 time:0s, loops:0 0 Bytes N/A`,
` └─TableReader_5 root 10000 data:TableFullScan_4 0 time:0s, loops:0 0 Bytes N/A`,
` └─TableFullScan_4 cop[tikv] 10000 table:t, keep order:false, stats:pseudo 0 N/A N/A`,
}, "\n"), planTree)

// Don't check planTree directly, because it contains execution time info which is not fixed after open/close time is included
require.True(t, strings.Contains(planTree, ` Projection_3 root 10000 vec_cosine_distance(test.t.c, cast([100,100,100,100,100,100,100,100,100,100,100,100,100,100,100,100...(len:401), vector))->Column#3`))
require.True(t, strings.Contains(planTree, ` └─TableReader_5 root 10000 data:TableFullScan_4`))
require.True(t, strings.Contains(planTree, ` └─TableFullScan_4 cop[tikv] 10000 table:t, keep order:false, stats:pseudo`))
// No need to check result at all.
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs))
}
Expand Down
35 changes: 33 additions & 2 deletions pkg/util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -1305,8 +1305,12 @@ func (waitSummary *TiFlashWaitSummary) CanBeIgnored() bool {
type BasicRuntimeStats struct {
// executor's Next() called times.
loop atomic.Int32
// executor consume time.
// executor consume time, including open, next, and close time.
consume atomic.Int64
// executor open time.
open atomic.Int64
// executor close time.
close atomic.Int64
// executor return row count.
rows atomic.Int64
}
Expand All @@ -1321,6 +1325,8 @@ func (e *BasicRuntimeStats) Clone() RuntimeStats {
result := &BasicRuntimeStats{}
result.loop.Store(e.loop.Load())
result.consume.Store(e.consume.Load())
result.open.Store(e.open.Load())
result.close.Store(e.close.Load())
result.rows.Store(e.rows.Load())
return result
}
Expand All @@ -1333,6 +1339,8 @@ func (e *BasicRuntimeStats) Merge(rs RuntimeStats) {
}
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.open.Add(tmp.open.Load())
e.close.Add(tmp.close.Load())
e.rows.Add(tmp.rows.Load())
}

Expand Down Expand Up @@ -1388,6 +1396,18 @@ func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) {
e.rows.Add(int64(rowNum))
}

// RecordOpen records executor's open time.
func (e *BasicRuntimeStats) RecordOpen(d time.Duration) {
e.consume.Add(int64(d))
e.open.Add(int64(d))
}

// RecordClose records executor's close time.
func (e *BasicRuntimeStats) RecordClose(d time.Duration) {
e.consume.Add(int64(d))
e.close.Add(int64(d))
}

// SetRowNum sets the row num.
func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {
e.rows.Store(rowNum)
Expand All @@ -1399,8 +1419,19 @@ func (e *BasicRuntimeStats) String() string {
return ""
}
var str strings.Builder
totalTime := e.consume.Load()
openTime := e.open.Load()
closeTime := e.close.Load()
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(e.consume.Load())))
str.WriteString(FormatDuration(time.Duration(totalTime)))
if openTime >= int64(time.Millisecond) {
str.WriteString(", open:")
str.WriteString(FormatDuration(time.Duration(openTime)))
}
if closeTime >= int64(time.Millisecond) {
str.WriteString(", close:")
str.WriteString(FormatDuration(time.Duration(closeTime)))
}
str.WriteString(", loops:")
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
return str.String()
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,10 @@ func TestRootRuntimeStats(t *testing.T) {
stmtStats := NewRuntimeStatsColl(nil)
basic1 := stmtStats.GetBasicRuntimeStats(pid)
basic2 := stmtStats.GetBasicRuntimeStats(pid)
basic1.RecordOpen(time.Millisecond * 10)
basic1.Record(time.Second, 20)
basic2.Record(time.Second*2, 30)
basic2.RecordClose(time.Millisecond * 100)
concurrency := &RuntimeStatsWithConcurrencyInfo{}
concurrency.SetConcurrencyInfo(NewConcurrencyInfo("worker", 15))
commitDetail := &util.CommitDetails{
Expand All @@ -472,7 +474,7 @@ func TestRootRuntimeStats(t *testing.T) {
Commit: commitDetail,
})
stats := stmtStats.GetRootStats(1)
expect := "time:3s, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}"
expect := "time:3.11s, open:10ms, close:100ms, loops:2, worker:15, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}"
require.Equal(t, expect, stats.String())
}

Expand Down

0 comments on commit e731f1b

Please sign in to comment.