Skip to content

Commit

Permalink
ddl: make sure put key into ETCD monotonously (#52381) (#52481)
Browse files Browse the repository at this point in the history
close #47060, close #52335
  • Loading branch information
ti-chi-bot authored Jun 5, 2024
1 parent 51ae335 commit 60e6e0b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 1 deletion.
2 changes: 2 additions & 0 deletions ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_test(
name = "syncer_test",
srcs = ["syncer_test.go"],
flaky = True,
shard_count = 3,
deps = [
":syncer",
"//ddl",
Expand All @@ -41,5 +42,6 @@ go_test(
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)
2 changes: 1 addition & 1 deletion ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64
var path string
if variable.EnableMDL.Load() {
path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID)
err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
} else {
path = s.selfSchemaVerPath
err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver,
Expand Down
43 changes: 43 additions & 0 deletions ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/integration"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -182,3 +184,44 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV
require.Equal(t, key, string(kv.Key))
require.Equal(t, val, string(kv.Value))
}

func TestPutKVToEtcdMono(t *testing.T) {
integration.BeforeTestExternal(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
cli := cluster.RandClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3))
require.NoError(t, err)

eg := errgroup.Group{}
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
// PutKVToEtcdMono should be conflicted and get errors.
require.Error(t, eg.Wait())

eg = errgroup.Group{}
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
require.NoError(t, eg.Wait())

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)
}
48 changes: 48 additions & 0 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo
return errors.Trace(err)
}

// PutKVToEtcdMono puts key value to etcd monotonously.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
// opts are configures of etcd Operations.
func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
if err = ctx.Err(); err != nil {
return errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout)
var resp *clientv3.GetResponse
resp, err = etcdCli.Get(childCtx, key)
if err != nil {
cancel()
logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
continue
}
prevRevision := int64(0)
if len(resp.Kvs) > 0 {
prevRevision = resp.Kvs[0].ModRevision
}

var txnResp *clientv3.TxnResponse
txnResp, err = etcdCli.Txn(childCtx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)).
Then(clientv3.OpPut(key, val, opts...)).
Commit()

cancel()

if err == nil && txnResp.Succeeded {
return nil
}

if err == nil {
err = errors.New("performing compare-and-swap during PutKVToEtcd failed")
}

logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
}
return errors.Trace(err)
}

// PutKVToEtcd puts key value to etcd.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
Expand Down

0 comments on commit 60e6e0b

Please sign in to comment.