From 312b69f2d7be80e50b400bebd438f1adb60155e7 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Wed, 10 Apr 2024 00:00:21 +0800 Subject: [PATCH] ddl: make sure put key into ETCD monotonously (#52381) close pingcap/tidb#47060, close pingcap/tidb#52335 --- pkg/ddl/syncer/BUILD.bazel | 1 + pkg/ddl/syncer/syncer.go | 2 +- pkg/ddl/syncer/syncer_test.go | 42 ++++++++++++++++++++++++++++++ pkg/ddl/util/util.go | 48 +++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pkg/ddl/syncer/BUILD.bazel b/pkg/ddl/syncer/BUILD.bazel index 672aca31a4870..a1cc2e9b691dd 100644 --- a/pkg/ddl/syncer/BUILD.bazel +++ b/pkg/ddl/syncer/BUILD.bazel @@ -33,6 +33,7 @@ go_test( "syncer_test.go", ], flaky = True, + shard_count = 3, deps = [ ":syncer", "//pkg/ddl", diff --git a/pkg/ddl/syncer/syncer.go b/pkg/ddl/syncer/syncer.go index ab225797696d0..d47d284b436a1 100644 --- a/pkg/ddl/syncer/syncer.go +++ b/pkg/ddl/syncer/syncer.go @@ -238,7 +238,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64 var path string if variable.EnableMDL.Load() { path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID) - err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver) + err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver) } else { path = s.selfSchemaVerPath err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver, diff --git a/pkg/ddl/syncer/syncer_test.go b/pkg/ddl/syncer/syncer_test.go index a1d63bda02094..bb51a7ecb6c77 100644 --- a/pkg/ddl/syncer/syncer_test.go +++ b/pkg/ddl/syncer/syncer_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "runtime" + "strconv" "testing" "time" @@ -182,3 +183,44 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV require.Equal(t, key, string(kv.Key)) require.Equal(t, val, string(kv.Value)) } + +func TestPutKVToEtcdMono(t *testing.T) { + integration.BeforeTestExternal(t) + + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + cli := cluster.RandClient() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1)) + require.NoError(t, err) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2)) + require.NoError(t, err) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3)) + require.NoError(t, err) + + eg := util.NewErrorGroupWithRecover() + for i := 0; i < 30; i++ { + eg.Go(func() error { + err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5)) + return err + }) + } + // PutKVToEtcdMono should be conflicted and get errors. + require.Error(t, eg.Wait()) + + eg = util.NewErrorGroupWithRecover() + for i := 0; i < 30; i++ { + eg.Go(func() error { + err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5)) + return err + }) + } + require.NoError(t, eg.Wait()) + + err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1)) + require.NoError(t, err) +} diff --git a/pkg/ddl/util/util.go b/pkg/ddl/util/util.go index 288827694e624..fa61d6f5f541e 100644 --- a/pkg/ddl/util/util.go +++ b/pkg/ddl/util/util.go @@ -288,6 +288,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo return errors.Trace(err) } +// PutKVToEtcdMono puts key value to etcd monotonously. +// etcdCli is client of etcd. +// retryCnt is retry time when an error occurs. +// opts are configures of etcd Operations. +func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string, + opts ...clientv3.OpOption) error { + var err error + for i := 0; i < retryCnt; i++ { + if err = ctx.Err(); err != nil { + return errors.Trace(err) + } + + childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout) + var resp *clientv3.GetResponse + resp, err = etcdCli.Get(childCtx, key) + if err != nil { + cancel() + logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i)) + time.Sleep(KeyOpRetryInterval) + continue + } + prevRevision := int64(0) + if len(resp.Kvs) > 0 { + prevRevision = resp.Kvs[0].ModRevision + } + + var txnResp *clientv3.TxnResponse + txnResp, err = etcdCli.Txn(childCtx). + If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)). + Then(clientv3.OpPut(key, val, opts...)). + Commit() + + cancel() + + if err == nil && txnResp.Succeeded { + return nil + } + + if err == nil { + err = errors.New("performing compare-and-swap during PutKVToEtcd failed") + } + + logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i)) + time.Sleep(KeyOpRetryInterval) + } + return errors.Trace(err) +} + // PutKVToEtcd puts key value to etcd. // etcdCli is client of etcd. // retryCnt is retry time when an error occurs.