Skip to content

Commit

Permalink
Merge branch 'master' into fix-datarace
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Feb 2, 2023
2 parents b7b6a51 + 04f9b3c commit d8dd633
Show file tree
Hide file tree
Showing 20 changed files with 391 additions and 70 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ type Config struct {
TiFlashComputeAutoScalerAddr string `toml:"autoscaler-addr" json:"autoscaler-addr"`
IsTiFlashComputeFixedPool bool `toml:"is-tiflashcompute-fixed-pool" json:"is-tiflashcompute-fixed-pool"`
AutoScalerClusterID string `toml:"autoscaler-cluster-id" json:"autoscaler-cluster-id"`
// todo: remove this after AutoScaler is stable.
UseAutoScaler bool `toml:"use-autoscaler" json:"use-autoscaler"`

// TiDBMaxReuseChunk indicates max cached chunk num
TiDBMaxReuseChunk uint32 `toml:"tidb-max-reuse-chunk" json:"tidb-max-reuse-chunk"`
Expand Down Expand Up @@ -1012,6 +1014,7 @@ var defaultConf = Config{
TiFlashComputeAutoScalerAddr: tiflashcompute.DefAWSAutoScalerAddr,
IsTiFlashComputeFixedPool: false,
AutoScalerClusterID: "",
UseAutoScaler: true,
TiDBMaxReuseChunk: 64,
TiDBMaxReuseColumn: 256,
TiDBEnableExitCheck: false,
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func (c *Config) Valid() error {
}

// Check tiflash_compute topo fetch is valid.
if c.DisaggregatedTiFlash {
if c.DisaggregatedTiFlash && c.UseAutoScaler {
if !tiflashcompute.IsValidAutoScalerConfig(c.TiFlashComputeAutoScalerType) {
return fmt.Errorf("invalid AutoScaler type, expect %s, %s or %s, got %s",
tiflashcompute.MockASStr, tiflashcompute.AWSASStr, tiflashcompute.GCPASStr, c.TiFlashComputeAutoScalerType)
Expand Down
4 changes: 4 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ autoscaler-addr = "tiflash-autoscale-lb.tiflash-autoscale.svc.cluster.local:8081
# Only meaningful when disaggregated-tiflash is true.
autoscaler-cluster-id = ""

# use-autoscaler indicates whether use AutoScaler or PD for tiflash_compute nodes, only meaningful when disaggregated-tiflash is true.
# Will remove this after AutoScaler is stable.
use-autoscaler = true

[log]
# Log level: debug, info, warn, error, fatal.
level = "info"
Expand Down
12 changes: 6 additions & 6 deletions ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ PARTITION BY RANGE (c) (
func TestFlashbackTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -327,7 +327,7 @@ PARTITION BY RANGE (c) (
func TestDropTable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -380,7 +380,7 @@ PARTITION BY RANGE (c) (
func TestCreateWithSameName(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -444,7 +444,7 @@ PARTITION BY RANGE (c) (
func TestPartition(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -504,7 +504,7 @@ PARTITION BY RANGE (c) (
func TestDropSchema(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -530,7 +530,7 @@ PARTITION BY RANGE (c) (
func TestDefaultKeyword(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true)
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions ddl/indexmergetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_test(
"merge_test.go",
],
flaky = True,
race = "on",
shard_count = 4,
deps = [
"//config",
Expand Down
3 changes: 3 additions & 0 deletions ddl/indexmergetest/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
callback := &callback.TestDDLCallback{Do: dom}

runPessimisticTxn := false
afterPessDML := make(chan struct{}, 1)
callback.OnJobRunBeforeExported = func(job *model.Job) {
if t.Failed() {
return
Expand All @@ -500,6 +501,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
assert.NoError(t, err)
_, err = tk2.Exec("update t set a = 3 where id = 1;")
assert.NoError(t, err)
afterPessDML <- struct{}{}
}
}
dom.DDL().SetHook(callback)
Expand All @@ -515,6 +517,7 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
case <-afterCommit:
require.Fail(t, "should be blocked by the pessimistic txn")
}
<-afterPessDML
tk2.MustExec("rollback;")
<-afterCommit
dom.DDL().SetHook(originHook)
Expand Down
2 changes: 1 addition & 1 deletion ddl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
conf.Experimental.AllowsExpressionIndex = true
})

_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
_, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) {
domain, err := session.BootstrapSession(store)
require.NoError(t, err)
defer domain.Close()
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true)
info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true)
require.NoError(t, err1)
conf := config.GetGlobalConfig()
conf.Socket = ""
Expand Down
87 changes: 53 additions & 34 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,26 @@ func NewMockDomain() *Domain {
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle atomic.Pointer[bindinfo.BindHandle]
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle atomic.Pointer[bindinfo.BindHandle]
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
// It is only used in storeMinStartTS and RemoveMinStartTS now.
// It must be used when the etcd path isn't needed to separate by keyspace.
// See keyspace RFC: https://github.com/pingcap/tidb/pull/39685
unprefixedEtcdCli *clientv3.Client
sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
Expand Down Expand Up @@ -881,6 +887,10 @@ func (do *Domain) Close() {
terror.Log(errors.Trace(do.etcdClient.Close()))
}

if do.unprefixedEtcdCli != nil {
terror.Log(errors.Trace(do.unprefixedEtcdCli.Close()))
}

do.slowQuery.Close()
if do.cancel != nil {
do.cancel()
Expand Down Expand Up @@ -927,6 +937,27 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

const serverIDForStandalone = 1 // serverID for standalone deployment.

func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: addrs,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
},
TLS: ebd.TLSConfig(),
})
return cli, err
}

// Init initializes a domain.
func (do *Domain) Init(
ddlLease time.Duration,
Expand All @@ -942,32 +973,20 @@ func (do *Domain) Init(
return err
}
if addrs != nil {
cfg := config.GetGlobalConfig()
// silence etcd warn log, when domain closed, it won't randomly print warn log
// see details at the issue https://github.com/pingcap/tidb/issues/15479
etcdLogCfg := zap.NewProductionConfig()
etcdLogCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: addrs,
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second * 3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(cfg.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}),
},
TLS: ebd.TLSConfig(),
})
cli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}

etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

do.etcdClient = cli

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
}
}

Expand Down Expand Up @@ -1029,7 +1048,7 @@ func (do *Domain) Init(

// step 1: prepare the info/schema syncer which domain reload needed.
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, skipRegisterToDashboard)
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard)
if err != nil {
return err
}
Expand Down
37 changes: 22 additions & 15 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,18 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA

// InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down.
type InfoSyncer struct {
etcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
managerMu struct {
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdCli *clientv3.Client
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
// It is only used in storeMinStartTS and RemoveMinStartTS now.
// It must be used when the etcd path isn't needed to separate by keyspace.
// See keyspace RFC: https://github.com/pingcap/tidb/pull/39685
unprefixedEtcdCli *clientv3.Client
info *ServerInfo
serverInfoPath string
minStartTS uint64
minStartTSPath string
managerMu struct {
mu sync.RWMutex
util2.SessionManager
}
Expand Down Expand Up @@ -180,12 +186,13 @@ func setGlobalInfoSyncer(is *InfoSyncer) {
}

// GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing.
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) {
is := &InfoSyncer{
etcdCli: etcdCli,
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
etcdCli: etcdCli,
unprefixedEtcdCli: unprefixedEtcdCli,
info: getServerInfo(id, serverIDGetter),
serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id),
minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id),
}
err := is.init(ctx, skipRegisterToDashBoard)
if err != nil {
Expand Down Expand Up @@ -721,20 +728,20 @@ func (is *InfoSyncer) GetMinStartTS() uint64 {

// storeMinStartTS stores self server min start timestamp to etcd.
func (is *InfoSyncer) storeMinStartTS(ctx context.Context) error {
if is.etcdCli == nil {
if is.unprefixedEtcdCli == nil {
return nil
}
return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
return util.PutKVToEtcd(ctx, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, is.minStartTSPath,
strconv.FormatUint(is.minStartTS, 10),
clientv3.WithLease(is.session.Lease()))
}

// RemoveMinStartTS removes self server min start timestamp from etcd.
func (is *InfoSyncer) RemoveMinStartTS() {
if is.etcdCli == nil {
if is.unprefixedEtcdCli == nil {
return
}
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
err := util.DeleteKeyFromEtcd(is.minStartTSPath, is.unprefixedEtcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
logutil.BgLogger().Error("remove minStartTS failed", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) {
require.NoError(t, err)
}()

info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, false)
info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false)
require.NoError(t, err)

err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) {

func TestTiFlashManager(t *testing.T) {
ctx := context.Background()
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, false)
_, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false)
tiflash := NewMockTiFlash()
SetMockTiFlash(tiflash)

Expand Down
Loading

0 comments on commit d8dd633

Please sign in to comment.