Skip to content

Commit

Permalink
executor: fix race when collecting telemetry info of batch copr (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored and ghazalfamilyusa committed Feb 15, 2023
1 parent 652641d commit 663478d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ type TelemetryInfo struct {
PartitionTelemetry *PartitionTelemetryInfo
AccountLockTelemetry *AccountLockTelemetryInfo
UseIndexMerge bool
UseTableLookUp bool
UseTableLookUp atomic.Bool
}

// PartitionTelemetryInfo records table partition telemetry information during execution.
Expand Down
6 changes: 3 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3883,7 +3883,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
if b.Ti != nil {
b.Ti.UseTableLookUp = true
b.Ti.UseTableLookUp.Store(true)
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
Expand Down Expand Up @@ -4022,7 +4022,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
if b.Ti != nil {
b.Ti.UseIndexMerge = true
b.Ti.UseTableLookUp = true
b.Ti.UseTableLookUp.Store(true)
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
Expand Down Expand Up @@ -4469,7 +4469,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
if builder.Ti != nil {
builder.Ti.UseTableLookUp = true
builder.Ti.UseTableLookUp.Store(true)
}
e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v)
if err != nil {
Expand Down
14 changes: 14 additions & 0 deletions executor/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,3 +1387,17 @@ PARTITION BY HASH (c5) PARTITIONS 4;`)
// Again, a simpler reproduce.
tk.MustQuery("select /*+ inl_join (t1, t2) */ t2.c5 from t1 right join t2 on t1.c2 = t2.c5 where not( t1.c2 between '4s7ht' and 'mj' );").Check(testkit.Rows())
}

func TestIssueRaceWhenBuildingExecutorConcurrently(t *testing.T) {
// issue: https://github.com/pingcap/tidb/issues/41412
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index idx_a(a), index idx_b(b))")
for i := 0; i < 2000; i++ {
v := i * 100
tk.MustExec("insert into t values(?, ?, ?)", v, v, v)
}
tk.MustQuery("select /*+ inl_merge_join(t1, t2) */ * from t t1 right join t t2 on t1.a = t2.b and t1.c = t2.c")
}
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4094,7 +4094,7 @@ func (s *session) updateTelemetryMetric(es *executor.ExecStmt) {
telemetryCreateOrAlterUserUsage.Add(float64(ti.AccountLockTelemetry.CreateOrAlterUser))
}

if ti.UseTableLookUp && s.sessionVars.StoreBatchSize > 0 {
if ti.UseTableLookUp.Load() && s.sessionVars.StoreBatchSize > 0 {
telemetryStoreBatchedUsage.Inc()
}
}
Expand Down

0 comments on commit 663478d

Please sign in to comment.