diff --git a/pkg/executor/index_lookup_hash_join.go b/pkg/executor/index_lookup_hash_join.go index dc74482ebb443..a22dba8ee3dca 100644 --- a/pkg/executor/index_lookup_hash_join.go +++ b/pkg/executor/index_lookup_hash_join.go @@ -196,6 +196,9 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { e.IndexLookUpJoin.finished.Store(true) + if e.cancelFunc != nil { + e.cancelFunc() + } err := fmt.Errorf("%v", r) if !e.panicErr.Load() { @@ -217,13 +220,8 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { task := &indexHashJoinTask{err: err} e.taskCh <- task } - - failpoint.Label("TestIssue49692End") - - if e.cancelFunc != nil { - e.cancelFunc() - } } + failpoint.Label("TestIssue49692End") e.workerWg.Done() } @@ -376,6 +374,11 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { err = errors.New("mockIndexHashJoinOuterWorkerErr") }) + failpoint.Inject("testIssue54055_1", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("testIssue54055_1") + } + }) if err != nil { task = &indexHashJoinTask{err: err} if ow.keepOuterOrder { @@ -679,6 +682,12 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH close(resultCh) } }() + failpoint.Inject("testIssue54055_2", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(10 * time.Millisecond) + panic("testIssue54055_2") + } + }) var joinStartTime time.Time if iw.stats != nil { start := time.Now() diff --git a/pkg/executor/index_lookup_join_test.go b/pkg/executor/index_lookup_join_test.go index f9552e8496c52..f5410794c76a6 100644 --- a/pkg/executor/index_lookup_join_test.go +++ b/pkg/executor/index_lookup_join_test.go @@ -538,3 +538,30 @@ func TestIssue54688(t *testing.T) { rs.Close() } } + +func TestIssue54055(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t, s;") + tk.MustExec("create table t(a int, index(a));") + tk.MustExec("create table s(a int, index(a));") + tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16), (17), (18), (19), (20), (21), (22), (23), (24), (25), (26), (27), (28), (29), (30), (31), (32), (33), (34), (35), (36), (37), (38), (39), (40), (41), (42), (43), (44), (45), (46), (47), (48), (49), (50), (51), (52), (53), (54), (55), (56), (57), (58), (59), (60), (61), (62), (63), (64), (65), (66), (67), (68), (69), (70), (71), (72), (73), (74), (75), (76), (77), (78), (79), (80), (81), (82), (83), (84), (85), (86), (87), (88), (89), (90), (91), (92), (93), (94), (95), (96), (97), (98), (99), (100), (101), (102), (103), (104), (105), (106), (107), (108), (109), (110), (111), (112), (113), (114), (115), (116), (117), (118), (119), (120), (121), (122), (123), (124), (125), (126), (127), (128);") + tk.MustExec("insert into s values(1), (128);") + + tk.MustExec("set @@tidb_max_chunk_size=32;") + tk.MustExec("set @@tidb_index_join_batch_size=32;") + tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testIssue54055_1", "2*return(false)->1*return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testIssue54055_2", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testIssue54055_1")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testIssue54055_2")) + }() + rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;") + require.NoError(t, err) + _, err = session.GetRows4Test(context.Background(), nil, rs) + require.NotNil(t, err) + rs.Close() +}