Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support metadata lock #37393

Merged
merged 45 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
260204d
init
wjhuang2016 Sep 8, 2022
1486a96
stable test
wjhuang2016 Sep 9, 2022
25637c4
refine comment
wjhuang2016 Sep 9, 2022
40df0e4
fmt
wjhuang2016 Sep 9, 2022
0d00275
refine
wjhuang2016 Sep 9, 2022
0b41074
address comment
wjhuang2016 Sep 9, 2022
8d5007e
refine
wjhuang2016 Sep 9, 2022
34d9be5
address comment
wjhuang2016 Sep 13, 2022
fd1ab6d
fix
wjhuang2016 Sep 13, 2022
bdf9405
refine
wjhuang2016 Sep 13, 2022
47e0103
refine
wjhuang2016 Sep 13, 2022
011e2b5
address comment
wjhuang2016 Sep 14, 2022
bf41fa5
fix
wjhuang2016 Sep 14, 2022
8ced48d
refine test
wjhuang2016 Sep 14, 2022
1886737
fix
wjhuang2016 Sep 14, 2022
c3b819f
refine
wjhuang2016 Sep 14, 2022
a798b02
add log
wjhuang2016 Sep 15, 2022
be5777b
revert default value
wjhuang2016 Sep 15, 2022
7ebaa06
fmt
wjhuang2016 Sep 15, 2022
2318638
fmt
wjhuang2016 Sep 15, 2022
8c50fbe
refine
wjhuang2016 Sep 15, 2022
59565ab
Merge branch 'master' of https://github.com/pingcap/tidb into mdl_demo
wjhuang2016 Sep 15, 2022
8f67df2
address comment
wjhuang2016 Sep 15, 2022
5d6cfdf
address comment
wjhuang2016 Sep 16, 2022
0d94462
refine
wjhuang2016 Sep 16, 2022
bc9de11
add comment
wjhuang2016 Sep 16, 2022
b532920
fix build
wjhuang2016 Sep 16, 2022
6d49169
fix bazel
wjhuang2016 Sep 16, 2022
4157a6e
Merge branch 'master' into mdl_demo
wjhuang2016 Sep 17, 2022
6dc847f
refine
wjhuang2016 Sep 17, 2022
82c1c79
Merge branch 'mdl_demo' of github.com:wjhuang2016/tidb into mdl_demo
wjhuang2016 Sep 17, 2022
bf082d0
try
wjhuang2016 Sep 17, 2022
fcdde8f
try
wjhuang2016 Sep 17, 2022
bdbd077
fix
wjhuang2016 Sep 17, 2022
a357f6a
refine
wjhuang2016 Sep 17, 2022
de40665
refine bazel
wjhuang2016 Sep 17, 2022
0cd8d1d
refine
wjhuang2016 Sep 17, 2022
13c48f4
refine
wjhuang2016 Sep 17, 2022
c25de3f
try
wjhuang2016 Sep 17, 2022
67ea0f1
clean
wjhuang2016 Sep 17, 2022
eb5997d
fix
wjhuang2016 Sep 17, 2022
e1547f4
refine
wjhuang2016 Sep 17, 2022
4822e8c
refine
wjhuang2016 Sep 17, 2022
52606da
Merge branch 'master' into mdl_demo
ti-chi-bot Sep 18, 2022
23a884f
Merge branch 'master' into mdl_demo
ti-chi-bot Sep 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ go_library(

go_test(
name = "ddl_test",
timeout = "moderate",
timeout = "long",
srcs = [
"attributes_sql_test.go",
"callback_test.go",
Expand Down
1 change: 1 addition & 0 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,7 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON")
defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF")

Expand Down
21 changes: 21 additions & 0 deletions ddl/concurrentddltest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "concurrentddltest_test",
srcs = [
"main_test.go",
"switch_test.go",
],
deps = [
"//config",
"//ddl",
"//kv",
"//meta",
"//testkit",
"//testkit/testsetup",
"//util",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
],
)
44 changes: 44 additions & 0 deletions ddl/concurrentddltest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrentddltest

import (
"testing"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()

config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

ddl.SetWaitTimeWhenErrorOccurred(time.Microsecond)

opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}

goleak.VerifyTestMain(m, opts...)
}
139 changes: 139 additions & 0 deletions ddl/concurrentddltest/switch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrentddltest

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestConcurrentDDLSwitch(t *testing.T) {
store := testkit.CreateMockStore(t)

type table struct {
columnIdx int
indexIdx int
}

var tables []*table
tblCount := 20
for i := 0; i < tblCount; i++ {
tables = append(tables, &table{1, 0})
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt=1")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size=32")

for i := range tables {
tk.MustExec(fmt.Sprintf("create table t%d (col0 int) partition by range columns (col0) ("+
"partition p1 values less than (100), "+
"partition p2 values less than (300), "+
"partition p3 values less than (500), "+
"partition p4 values less than (700), "+
"partition p5 values less than (1000), "+
"partition p6 values less than maxvalue);",
i))
for j := 0; j < 1000; j++ {
tk.MustExec(fmt.Sprintf("insert into t%d values (%d)", i, j))
}
}

ddls := make([]string, 0, tblCount)
ddlCount := 100
for i := 0; i < ddlCount; i++ {
tblIdx := rand.Intn(tblCount)
if rand.Intn(2) == 0 {
ddls = append(ddls, fmt.Sprintf("alter table t%d add index idx%d (col0)", tblIdx, tables[tblIdx].indexIdx))
tables[tblIdx].indexIdx++
} else {
ddls = append(ddls, fmt.Sprintf("alter table t%d add column col%d int", tblIdx, tables[tblIdx].columnIdx))
tables[tblIdx].columnIdx++
}
}

c := atomic.NewInt32(0)
ch := make(chan struct{})
go func() {
var wg util.WaitGroupWrapper
for i := range ddls {
wg.Add(1)
go func(idx int) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec(ddls[idx])
c.Add(1)
wg.Done()
}(i)
}
wg.Wait()
ch <- struct{}{}
}()

ticker := time.NewTicker(time.Second)
count := 0
done := false
for !done {
select {
case <-ch:
done = true
case <-ticker.C:
var b bool
var err error
err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error {
b, err = meta.NewMeta(txn).IsConcurrentDDL()
return err
})
require.NoError(t, err)
rs, err := testkit.NewTestKit(t, store).Exec(fmt.Sprintf("set @@global.tidb_enable_concurrent_ddl=%t", !b))
if rs != nil {
require.NoError(t, rs.Close())
}
if err == nil {
count++
if b {
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select count(*) from mysql.tidb_ddl_job").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from mysql.tidb_ddl_reorg").Check(testkit.Rows("0"))
}
}
}
}

require.Equal(t, int32(ddlCount), c.Load())
require.Greater(t, count, 0)

tk = testkit.NewTestKit(t, store)
tk.MustExec("use test")
for i, tbl := range tables {
tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.columns where TABLE_SCHEMA = 'test' and TABLE_NAME = 't%d'", i)).Check(testkit.Rows(fmt.Sprintf("%d", tbl.columnIdx)))
tk.MustExec(fmt.Sprintf("admin check table t%d", i))
for j := 0; j < tbl.indexIdx; j++ {
tk.MustExec(fmt.Sprintf("admin check index t%d idx%d", i, j))
}
}
}
2 changes: 2 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
ReorgTableID = meta.MaxInt48 - 2
// HistoryTableID is the table ID of `tidb_ddl_history`.
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
Expand Down
15 changes: 2 additions & 13 deletions ddl/db_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -129,20 +127,10 @@ func TestIndexOnCacheTable(t *testing.T) {
}

func TestAlterTableCache(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
session.SetSchemaLease(600 * time.Millisecond)
session.DisableStats4Test()
dom, err := session.BootstrapSession(store)
require.NoError(t, err)
store, dom := testkit.CreateMockStoreAndDomain(t)

dom.SetStatsUpdating(true)

t.Cleanup(func() {
dom.Close()
err := store.Close()
require.NoError(t, err)
})
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)

Expand All @@ -157,6 +145,7 @@ func TestAlterTableCache(t *testing.T) {
checkTableCacheStatus(t, tk, "test", "t1", model.TableCacheStatusEnable)
tk.MustExec("alter table t1 nocache")
tk.MustExec("drop table if exists t1")
tk.MustExec("set global tidb_enable_metadata_lock=0")
/*Test can't skip schema checker*/
tk.MustExec("drop table if exists t1,t2")
tk.MustExec("CREATE TABLE t1 (a int)")
Expand Down
2 changes: 2 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3324,6 +3324,7 @@ func TestPartitionErrorCode(t *testing.T) {
// Reduce the impact on DML when executing partition DDL
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk1.MustExec("set global tidb_enable_metadata_lock=0")
tk1.MustExec("drop table if exists t;")
tk1.MustExec(`create table t(id int primary key)
partition by hash(id) partitions 4;`)
Expand Down Expand Up @@ -3485,6 +3486,7 @@ func TestCommitWhenSchemaChange(t *testing.T) {
})
store := testkit.CreateMockStoreWithSchemaLease(t, time.Second)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set @@global.tidb_max_delta_schema_count= 4096")
tk.MustExec("use test")
tk.MustExec(`create table schema_change (a int, b timestamp)
Expand Down
1 change: 1 addition & 0 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ func TestLockTables(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, time.Microsecond*500)
setTxnTk := testkit.NewTestKit(t, store)
setTxnTk.MustExec("set global tidb_txn_mode=''")
setTxnTk.MustExec("set global tidb_enable_metadata_lock=0")
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1,t2")
Expand Down
24 changes: 16 additions & 8 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func TestIssue22819(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, dbTestLease)

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("set global tidb_enable_metadata_lock=0")
tk1.MustExec("use test;")
tk1.MustExec("create table t1 (v int) partition by hash (v) partitions 2")
tk1.MustExec("insert into t1 values (1)")
Expand Down Expand Up @@ -817,7 +818,11 @@ func TestForbidCacheTableForSystemTable(t *testing.T) {
for _, one := range sysTables {
err := tk.ExecToErr(fmt.Sprintf("alter table `%s` cache", one))
if db == "MySQL" {
require.EqualError(t, err, "[ddl:8200]ALTER table cache for tables in system database is currently unsupported")
if one == "tidb_mdl_view" {
require.EqualError(t, err, "[ddl:1347]'MySQL.tidb_mdl_view' is not BASE TABLE")
} else {
require.EqualError(t, err, "[ddl:8200]ALTER table cache for tables in system database is currently unsupported")
}
} else {
require.EqualError(t, err, fmt.Sprintf("[planner:1142]ALTER command denied to user 'root'@'%%' for table '%s'", strings.ToLower(one)))
}
Expand Down Expand Up @@ -980,6 +985,7 @@ func TestCommitTxnWithIndexChange(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
// Prepare work.
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))")
Expand Down Expand Up @@ -1325,6 +1331,7 @@ func TestTxnSavepointWithDDL(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk2.MustExec("use test;")

prepareFn := func() {
Expand Down Expand Up @@ -1376,6 +1383,7 @@ func TestAmendTxnSavepointWithDDL(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("set global tidb_enable_metadata_lock=0")
tk2.MustExec("use test;")
tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;")

Expand Down Expand Up @@ -1437,7 +1445,7 @@ func TestSnapshotVersion(t *testing.T) {

// For updating the self schema version.
goCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
err := dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
err := dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, 0, is.SchemaMetaVersion())
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
cancel()
require.NoError(t, err)

Expand All @@ -1447,7 +1455,7 @@ func TestSnapshotVersion(t *testing.T) {

// Make sure that the self schema version doesn't be changed.
goCtx, cancel = context.WithTimeout(context.Background(), 100*time.Millisecond)
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, 0, is.SchemaMetaVersion())
cancel()
require.NoError(t, err)

Expand Down Expand Up @@ -1498,33 +1506,33 @@ func TestSchemaValidator(t *testing.T) {
require.NoError(t, err)

ts := ver.Ver
_, res := dom.SchemaValidator.Check(ts, schemaVer, nil)
_, res := dom.SchemaValidator.Check(ts, schemaVer, nil, true)
require.Equal(t, domain.ResultSucc, res)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/ErrorMockReloadFailed", `return(true)`))

err = dom.Reload()
require.Error(t, err)
_, res = dom.SchemaValidator.Check(ts, schemaVer, nil)
_, res = dom.SchemaValidator.Check(ts, schemaVer, nil, true)
require.Equal(t, domain.ResultSucc, res)
time.Sleep(dbTestLease)

ver, err = store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
ts = ver.Ver
_, res = dom.SchemaValidator.Check(ts, schemaVer, nil)
_, res = dom.SchemaValidator.Check(ts, schemaVer, nil, true)
require.Equal(t, domain.ResultUnknown, res)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/ErrorMockReloadFailed"))
err = dom.Reload()
require.NoError(t, err)

_, res = dom.SchemaValidator.Check(ts, schemaVer, nil)
_, res = dom.SchemaValidator.Check(ts, schemaVer, nil, true)
require.Equal(t, domain.ResultSucc, res)

// For schema check, it tests for getting the result of "ResultUnknown".
is := dom.InfoSchema()
schemaChecker := domain.NewSchemaChecker(dom, is.SchemaMetaVersion(), nil)
schemaChecker := domain.NewSchemaChecker(dom, is.SchemaMetaVersion(), nil, true)
// Make sure it will retry one time and doesn't take a long time.
domain.SchemaOutOfDateRetryTimes.Store(1)
domain.SchemaOutOfDateRetryInterval.Store(time.Millisecond * 1)
Expand Down
Loading