Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix flaky test TestCancel & fix panic when we failed to get job in delivery2Worker #53887

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ddl_test
import (
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -28,17 +29,18 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/stretchr/testify/require"
atomicutil "go.uber.org/atomic"
)

type testCancelJob struct {
sql string
ok bool
cancelState any // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
sql string
expectCancelled bool
cancelState any // model.SchemaState | []model.SchemaState
onJobBefore bool
onJobUpdate bool
prepareSQL []string
}

var allTestCase = []testCancelJob{
Expand Down Expand Up @@ -204,6 +206,14 @@ func cancelSuccess(rs *testkit.Result) bool {
}

func TestCancel(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) })
waitDDLWorkerExited := func() {
require.Eventually(t, func() bool {
return enterCnt.Load() == exitCnt.Load()
}, 10*time.Second, 10*time.Millisecond)
}
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 100*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -244,28 +254,28 @@ func TestCancel(t *testing.T) {

hook := &callback.TestDDLCallback{Do: dom}
i := atomicutil.NewInt64(0)
cancel := atomicutil.NewBool(false)
canceled := atomicutil.NewBool(false)
cancelResult := atomicutil.NewBool(false)
cancelWhenReorgNotStart := atomicutil.NewBool(false)

hookFunc := func(job *model.Job) {
if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !cancel.Load() {
if testutil.TestMatchCancelState(t, job, allTestCase[i.Load()].cancelState, allTestCase[i.Load()].sql) && !canceled.Load() {
if !cancelWhenReorgNotStart.Load() && job.SchemaState == model.StateWriteReorganization && job.MayNeedReorg() && job.RowCount == 0 {
return
}
rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
cancelResult.Store(cancelSuccess(rs))
cancel.Store(true)
canceled.Store(true)
}
}
dom.DDL().SetHook(hook.Clone())

restHook := func(h *callback.TestDDLCallback) {
resetHook := func(h *callback.TestDDLCallback) {
h.OnJobRunBeforeExported = nil
h.OnJobUpdatedExported.Store(nil)
dom.DDL().SetHook(h.Clone())
}
registHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) {
registerHook := func(h *callback.TestDDLCallback, onJobRunBefore bool) {
if onJobRunBefore {
h.OnJobRunBeforeExported = hookFunc
} else {
Expand All @@ -274,41 +284,47 @@ func TestCancel(t *testing.T) {
dom.DDL().SetHook(h.Clone())
}

waitDDLWorkerExited()
for j, tc := range allTestCase {
t.Logf("running test case %d: %s", j, tc.sql)
i.Store(int64(j))
msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState)
if tc.onJobBefore {
restHook(hook)
resetHook(hook)
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}
cancel.Store(false)
waitDDLWorkerExited()
canceled.Store(false)
cancelWhenReorgNotStart.Store(true)
registHook(hook, true)
if tc.ok {
registerHook(hook, true)
if tc.expectCancelled {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel.Load() {
require.Equal(t, tc.ok, cancelResult.Load(), msg)
waitDDLWorkerExited()
if canceled.Load() {
require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg)
}
}
if tc.onJobUpdate {
restHook(hook)
resetHook(hook)
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}
cancel.Store(false)
waitDDLWorkerExited()
canceled.Store(false)
cancelWhenReorgNotStart.Store(false)
registHook(hook, false)
if tc.ok {
registerHook(hook, false)
if tc.expectCancelled {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel.Load() {
require.Equal(t, tc.ok, cancelResult.Load(), msg)
waitDDLWorkerExited()
if canceled.Load() {
require.Equal(t, tc.expectCancelled, cancelResult.Load(), msg)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (d *ddl) delivery2LocalWorker(pool *workerPool, task *limitJobTask) {

// delivery2Worker owns the worker, need to put it back to the pool in this function.
func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
failpoint.InjectCall("beforeDelivery2Worker", job)
injectFailPointForGetJob(job)
jobID, involvedSchemaInfos := job.ID, job.GetInvolvingSchemaInfo()
s.runningJobs.add(jobID, involvedSchemaInfos)
Expand Down Expand Up @@ -561,18 +562,18 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model.
// job is already moved to history.
failpoint.InjectCall("beforeRefreshJob", job)
for {
job, err = s.sysTblMgr.GetJobByID(s.schCtx, job.ID)
job, err = s.sysTblMgr.GetJobByID(s.schCtx, jobID)
failpoint.InjectCall("mockGetJobByIDFail", &err)
if err == nil {
break
}

if err == systable.ErrNotFound {
logutil.DDLLogger().Info("job not found, might already finished",
zap.Int64("job_id", job.ID), zap.Stringer("state", job.State))
zap.Int64("job_id", jobID))
return
}
logutil.DDLLogger().Error("get job failed", zap.Error(err))
logutil.DDLLogger().Error("get job failed", zap.Int64("job_id", jobID), zap.Error(err))
select {
case <-s.schCtx.Done():
return
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/adminpause/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_test(
"//pkg/errno",
"//pkg/parser/model",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/tests/adminpause/pause_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -90,7 +91,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit
var isCancelled = &atomic.Bool{}
var cancelResultChn = make(chan []sqlexec.RecordSet, 1)
var cancelErrChn = make(chan error, 1)
var cancelFunc = func(jobType string) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob", func(*model.Job) {
Logger.Debug("pauseAndCancelStmt: OnGetJobBeforeExported, ",
zap.String("Expected Schema State", stmtCase.schemaState.String()))

Expand All @@ -107,7 +108,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit

isCancelled.CompareAndSwap(false, true) // In case that it runs into this scope again and again
}
}
})
var verifyCancelResult = func(t *testing.T, adminCommandKit *testkit.TestKit) {
require.True(t, isCancelled.Load())

Expand All @@ -130,7 +131,6 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit
originalHook := dom.DDL().GetHook()

hook.OnJobRunBeforeExported = pauseFunc
hook.OnGetJobBeforeExported = cancelFunc
dom.DDL().SetHook(hook.Clone())

isPaused.Store(false)
Expand All @@ -152,6 +152,7 @@ func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit

// Release the hook, so that we could run the `rollbackStmts` successfully.
dom.DDL().SetHook(originalHook)
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob")

for _, rollbackStmt := range stmtCase.rollbackStmts {
// no care about the result here, since the `statement` could have been cancelled OR finished successfully.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func TestMatchCancelState(t *testing.T, job *model.Job, cancelState any, sql str
switch v := cancelState.(type) {
case model.SchemaState:
if job.Type == model.ActionMultiSchemaChange {
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s)", sql, v)
msg := fmt.Sprintf("unexpected multi-schema change(sql: %s, cancel state: %s, job: %s)", sql, v, job.String())
require.Failf(t, msg, "use []model.SchemaState as cancel states instead")
return false
}
return job.SchemaState == v
case SubStates: // For multi-schema change sub-jobs.
if job.MultiSchemaInfo == nil {
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v)", sql, v)
msg := fmt.Sprintf("not multi-schema change(sql: %s, cancel state: %v, job: %s)", sql, v, job.String())
require.Failf(t, msg, "use model.SchemaState as the cancel state instead")
return false
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/testkit/testfailpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func EnableCall(t testing.TB, name string, fn any) {
require.NoError(t, failpoint.Disable(name))
})
}

// Disable disables fail-point.
func Disable(t testing.TB, name string) {
require.NoError(t, failpoint.Disable(name))
}