Skip to content

Commit

Permalink
port test in 52467
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Apr 11, 2024
1 parent 7c40f0b commit 16b091d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"strconv"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/pingcap/tidb/pkg/util/logutil"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -82,6 +84,8 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

var ResignOwnerForTest = atomic.NewBool(false)

// Register creates a new backend and registers it to the backend context.
func (m *litBackendCtxMgr) Register(
ctx context.Context,
Expand All @@ -106,6 +110,9 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
}
failpoint.Inject("beforeCreateLocalBackend", func() {
ResignOwnerForTest.Store(true)
})
bd, err := createLocalBackend(ctx, cfg, pdSvcDiscovery)
if err != nil {
logutil.Logger(ctx).Error(LitErrCreateBackendFail, zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
10 changes: 10 additions & 0 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ func (d *ddl) startDispatchLoop() {
time.Sleep(dispatchLoopWaitingDuration)
continue
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
err2 := d.ownerManager.ResignOwner(context.Background())
if err2 != nil {
logutil.BgLogger().Info("resign meet error", zap.Error(err2))
}
time.Sleep(500 * time.Millisecond)
ingest.ResignOwnerForTest.Store(false)
}
})
select {
case <-d.ddlJobCh:
case <-ticker.C:
Expand Down
19 changes: 19 additions & 0 deletions pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"math"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -581,6 +584,19 @@ func (em *engineManager) getBufferPool() *membuf.Pool {
return em.bufferPool
}

// only used in tests
type slowCreateFS struct {
vfs.FS
}

func (s slowCreateFS) Create(name string) (vfs.File, error) {
if strings.Contains(name, "temporary") {
// print stack
time.Sleep(time.Second)
}
return s.FS.Create(name)
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
// TODO: Optimize the opts for better write.
Expand All @@ -589,6 +605,9 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
newRangePropertiesCollector,
},
}
failpoint.Inject("slowCreateFS", func() {
opts.FS = slowCreateFS{vfs.Default}
})
return pebble.Open(dbPath, opts)
}

Expand Down
41 changes: 41 additions & 0 deletions tests/realtikvtest/addindextest4/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,3 +538,44 @@ func TestAddUniqueIndexDuplicatedError(t *testing.T) {
tk.MustExec("INSERT INTO `b1cce552` (`f5d9aecb`, `d9337060`, `4c74082f`, `9215adc3`, `85ad5a07`, `8c60260f`, `8069da7b`, `91e218e1`) VALUES ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 846, 'N6QD1=@ped@owVoJx', '9soPM2d6H', 'Tv%'), ('2031-12-22 06:44:52', 'duplicatevalue', 2028, NULL, 9052, '_HWaf#gD!bw', '9soPM2d6H', 'Tv%');")
tk.MustGetErrCode("ALTER TABLE `b1cce552` ADD unique INDEX `65290727` (`4c74082f`, `d9337060`, `8069da7b`);", errno.ErrDupEntry)
}
func TestFirstLitSlowStart(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t(a int, b int);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);")
tk.MustExec("create table t2(a int, b int);")
tk.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use addindexlit;")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend", "1*return()"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/beforeCreateLocalBackend"))
})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck", "return()"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ownerResignAfterDispatchLoopCheck"))
})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/lightning/backend/local/slowCreateFS", "return()"))
t.Cleanup(func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/lightning/backend/local/slowCreateFS"))
})

var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
tk.MustExec("alter table t add unique index idx(a);")
}()
go func() {
defer wg.Done()
tk1.MustExec("alter table t2 add unique index idx(a);")
}()
wg.Wait()
}

0 comments on commit 16b091d

Please sign in to comment.