From 10776b3bb93af520d40e612be01a1c36cceb5b3f Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 4 May 2023 10:43:55 +0800 Subject: [PATCH] owner: add setting/getting the owner value info (#43353) close pingcap/tidb#43352 --- metrics/owner.go | 1 + owner/BUILD.bazel | 5 +- owner/manager.go | 122 +++++++++++++++++++++++++++++---- owner/manager_test.go | 156 ++++++++++++++++++++++++++++++------------ owner/mock.go | 49 +++++++++++++ 5 files changed, 273 insertions(+), 60 deletions(-) diff --git a/metrics/owner.go b/metrics/owner.go index 0ddc26d321683..be6d5bd656e1f 100644 --- a/metrics/owner.go +++ b/metrics/owner.go @@ -25,6 +25,7 @@ var ( WatcherClosed = "watcher_closed" Cancelled = "cancelled" Deleted = "deleted" + PutValue = "put_value" SessionDone = "session_done" CtxDone = "context_done" WatchOwnerCounter *prometheus.CounterVec diff --git a/owner/BUILD.bazel b/owner/BUILD.bazel index b304c3ef473f1..7477feaf3f3ae 100644 --- a/owner/BUILD.bazel +++ b/owner/BUILD.bazel @@ -9,11 +9,13 @@ go_library( importpath = "github.com/pingcap/tidb/owner", visibility = ["//visibility:public"], deps = [ + "//ddl/util", "//metrics", "//parser/terror", "//util", "//util/logutil", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@io_etcd_go_etcd_api_v3//mvccpb", "@io_etcd_go_etcd_api_v3//v3rpc/rpctypes", "@io_etcd_go_etcd_client_v3//:client", @@ -32,10 +34,11 @@ go_test( ], embed = [":owner"], flaky = True, - shard_count = 3, + shard_count = 4, deps = [ "//ddl", "//infoschema", + "//kv", "//parser/terror", "//store/mockstore", "//testkit/testsetup", diff --git a/owner/manager.go b/owner/manager.go index 4223a433b8b55..0c7a5cd8a333c 100644 --- a/owner/manager.go +++ b/owner/manager.go @@ -15,6 +15,7 @@ package owner import ( + "bytes" "context" "fmt" "os" @@ -25,6 +26,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/terror" util2 "github.com/pingcap/tidb/util" @@ -46,6 +48,8 @@ type Manager interface { RetireOwner() // GetOwnerID gets the owner ID. GetOwnerID(ctx context.Context) (string, error) + // SetOwnerOpValue updates the owner op value. + SetOwnerOpValue(ctx context.Context, op OpType) error // CampaignOwner campaigns the owner. CampaignOwner() error // ResignOwner lets the owner start a new election. @@ -65,6 +69,25 @@ const ( keyOpDefaultTimeout = 5 * time.Second ) +// OpType is the owner key value operation type. +type OpType byte + +// List operation of types. +const ( + OpNone OpType = 0 + OpGetUpgradingState OpType = 1 +) + +// String implements fmt.Stringer interface. +func (ot OpType) String() string { + switch ot { + case OpGetUpgradingState: + return "get upgrading state" + default: + return "none" + } +} + // DDLOwnerChecker is used to check whether tidb is owner. type DDLOwnerChecker interface { // IsOwner returns whether the ownerManager is the owner. @@ -225,6 +248,12 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) { return } case <-campaignContext.Done(): + failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { + if v.(string) == "delOwnerKeyAndNotOwner" { + logutil.Logger(logCtx).Info("mock break campaign and don't clear related info") + return + } + }) logutil.Logger(logCtx).Info("break campaign loop, context is done") m.revokeSession(logPrefix, etcdSession.Lease()) return @@ -248,7 +277,7 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) { continue } - ownerKey, err := GetOwnerInfo(campaignContext, logCtx, elec, m.id) + ownerKey, err := GetOwnerKey(campaignContext, logCtx, m.etcdCli, m.key, m.id) if err != nil { continue } @@ -274,32 +303,97 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) { // GetOwnerID implements Manager.GetOwnerID interface. func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) { - resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...) + _, ownerID, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + return string(ownerID), errors.Trace(err) +} + +func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, error) { + var op OpType + resp, err := etcdCli.Get(ctx, ownerPath, clientv3.WithFirstCreate()...) if err != nil { - return "", errors.Trace(err) + logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err)) + return "", nil, op, 0, errors.Trace(err) } if len(resp.Kvs) == 0 { - return "", concurrency.ErrElectionNoLeader + return "", nil, op, 0, concurrency.ErrElectionNoLeader } - return string(resp.Kvs[0].Value), nil + + var ownerID []byte + ownerID, op = splitOwnerValues(resp.Kvs[0].Value) + logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key), + zap.ByteString("ownerID", ownerID), zap.Stringer("op", op)) + return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].ModRevision, nil } -// GetOwnerInfo gets the owner information. -func GetOwnerInfo(ctx, logCtx context.Context, elec *concurrency.Election, id string) (string, error) { - resp, err := elec.Leader(ctx) +// GetOwnerKey gets the owner key information. +func GetOwnerKey(ctx, logCtx context.Context, etcdCli *clientv3.Client, etcdKey, id string) (string, error) { + ownerKey, ownerID, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey) if err != nil { - // If no leader elected currently, it returns ErrElectionNoLeader. - logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err)) return "", errors.Trace(err) } - ownerID := string(resp.Kvs[0].Value) - logutil.Logger(logCtx).Info("get owner", zap.String("ownerID", ownerID)) - if ownerID != id { + if string(ownerID) != id { logutil.Logger(logCtx).Warn("is not the owner") return "", errors.New("ownerInfoNotMatch") } - return string(resp.Kvs[0].Key), nil + return ownerKey, nil +} + +func splitOwnerValues(val []byte) ([]byte, OpType) { + vals := bytes.Split(val, []byte("_")) + var op OpType + if len(vals) == 2 { + op = OpType(vals[1][0]) + } + return vals[0], op +} + +func joinOwnerValues(vals ...[]byte) []byte { + return bytes.Join(vals, []byte("_")) +} + +// SetOwnerOpValue implements Manager.SetOwnerOpValue interface. +func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error { + // owner don't change. + ownerKey, ownerID, currOp, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key) + if err != nil { + return errors.Trace(err) + } + if currOp == op { + logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op)) + return nil + } + if string(ownerID) != m.id { + return errors.New("ownerInfoNotMatch") + } + newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)}) + + failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) { + if valStr, ok := v.(string); ok { + if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil { + failpoint.Return(err) + } + } + }) + + resp, err := m.etcdCli.Txn(ctx). + If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)). + Then(clientv3.OpPut(ownerKey, string(newOwnerVal))). + Commit() + logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID), + zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Bool("isSuc", resp.Succeeded), zap.Error(err)) + if !resp.Succeeded { + err = errors.New("put owner key failed, cmp is false") + } + metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc() + return errors.Trace(err) +} + +// GetOwnerOpValue gets the owner op value. +func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) { + logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) + _, _, op, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath) + return op, errors.Trace(err) } func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) { diff --git a/owner/manager_test.go b/owner/manager_test.go index fecad76341852..dcdfd2f49f95e 100644 --- a/owner/manager_test.go +++ b/owner/manager_test.go @@ -22,8 +22,10 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" . "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/store/mockstore" @@ -36,43 +38,65 @@ import ( const testLease = 5 * time.Millisecond -func TestSingle(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") - } - integration.BeforeTestExternal(t) +type testInfo struct { + store kv.Storage + cluster *integration.ClusterV3 + client *clientv3.Client + ddl DDL +} +func newTestInfo(t *testing.T) *testInfo { store, err := mockstore.NewMockStore() require.NoError(t, err) - defer func() { - err := store.Close() - require.NoError(t, err) - }() - - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) - client := cluster.RandClient() - ctx := context.Background() + cli := cluster.Client(0) ic := infoschema.NewCache(2) ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) d := NewDDL( - ctx, - WithEtcdClient(client), + context.Background(), + WithEtcdClient(cli), WithStore(store), WithLease(testLease), WithInfoCache(ic), ) + + return &testInfo{ + store: store, + cluster: cluster, + client: cli, + ddl: d, + } +} + +func (ti *testInfo) Close(t *testing.T) { + err := ti.ddl.Stop() + require.NoError(t, err) + err = ti.store.Close() + require.NoError(t, err) + ti.cluster.Terminate(t) +} + +func TestSingle(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + client, d := tInfo.client, tInfo.ddl + defer tInfo.Close(t) require.NoError(t, d.OwnerManager().CampaignOwner()) isOwner := checkOwner(d, true) require.True(t, isOwner) // test for newSession failed + ctx := context.Background() ctx, cancel := context.WithCancel(ctx) manager := owner.NewOwnerManager(ctx, client, "ddl", "ddl_id", DDLOwnerKey) cancel() - err = manager.CampaignOwner() + err := manager.CampaignOwner() comment := fmt.Sprintf("campaigned result don't match, err %v", err) require.True(t, terror.ErrorEqual(err, context.Canceled) || terror.ErrorEqual(err, context.DeadlineExceeded), comment) @@ -87,8 +111,69 @@ func TestSingle(t *testing.T) { time.Sleep(200 * time.Millisecond) // err is ok to be not nil since we canceled the manager. - ownerID, _ := manager.GetOwnerID(context.Background()) + ownerID, _ := manager.GetOwnerID(ctx) require.Equal(t, "", ownerID) + op, _ := owner.GetOwnerOpValue(ctx, client, DDLOwnerKey, "log prefix") + require.Equal(t, op, owner.OpNone) +} + +func TestSetAndGetOwnerOpValue(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + defer tInfo.Close(t) + + require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) + isOwner := checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + + // test set/get owner info + manager := tInfo.ddl.OwnerManager() + ownerID, err := manager.GetOwnerID(context.Background()) + require.NoError(t, err) + require.Equal(t, tInfo.ddl.GetID(), ownerID) + op, err := owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpNone) + err = manager.SetOwnerOpValue(context.Background(), owner.OpGetUpgradingState) + require.NoError(t, err) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpGetUpgradingState) + // update the same as the original value + err = manager.SetOwnerOpValue(context.Background(), owner.OpGetUpgradingState) + require.NoError(t, err) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpGetUpgradingState) + // test del owner key when SetOwnerOpValue + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/MockDelOwnerKey", `return("delOwnerKeyAndNotOwner")`)) + err = manager.SetOwnerOpValue(context.Background(), owner.OpNone) + require.Error(t, err, "put owner key failed, cmp is false") + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NotNil(t, err) + require.Equal(t, concurrency.ErrElectionNoLeader.Error(), err.Error()) + require.Equal(t, op, owner.OpNone) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/MockDelOwnerKey")) + + // Let ddl run for the owner again. + require.NoError(t, tInfo.ddl.OwnerManager().CampaignOwner()) + isOwner = checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + // Mock the manager become not owner because the owner is deleted(like TTL is timeout). + // And then the manager campaigns the owner again, and become the owner. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/owner/MockDelOwnerKey", `return("onlyDelOwnerKey")`)) + err = manager.SetOwnerOpValue(context.Background(), owner.OpGetUpgradingState) + require.Error(t, err, "put owner key failed, cmp is false") + isOwner = checkOwner(tInfo.ddl, true) + require.True(t, isOwner) + op, err = owner.GetOwnerOpValue(context.Background(), tInfo.client, DDLOwnerKey, "log prefix") + require.NoError(t, err) + require.Equal(t, op, owner.OpNone) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/owner/MockDelOwnerKey")) } func TestCluster(t *testing.T) { @@ -103,27 +188,9 @@ func TestCluster(t *testing.T) { owner.ManagerSessionTTL = originalTTL }() - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { - err := store.Close() - require.NoError(t, err) - }() - - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 4}) - defer cluster.Terminate(t) - - cli := cluster.Client(0) - ic := infoschema.NewCache(2) - ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 0), 0) - d := NewDDL( - context.Background(), - WithEtcdClient(cli), - WithStore(store), - WithLease(testLease), - WithInfoCache(ic), - ) - + tInfo := newTestInfo(t) + store, cluster, d := tInfo.store, tInfo.cluster, tInfo.ddl + defer tInfo.Close(t) require.NoError(t, d.OwnerManager().CampaignOwner()) isOwner := checkOwner(d, true) @@ -146,7 +213,7 @@ func TestCluster(t *testing.T) { // Delete the leader key, the d1 become the owner. cliRW := cluster.Client(2) - err = deleteLeader(cliRW, DDLOwnerKey) + err := deleteLeader(cliRW, DDLOwnerKey) require.NoError(t, err) isOwner = checkOwner(d, false) @@ -173,14 +240,13 @@ func TestCluster(t *testing.T) { // Cancel the owner context, there is no owner. d1.OwnerManager().Cancel() - session, err := concurrency.NewSession(cliRW) - require.NoError(t, err) - - election := concurrency.NewElection(session, DDLOwnerKey) logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id") logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix) - _, err = owner.GetOwnerInfo(context.Background(), logCtx, election, "useless id") + _, err = owner.GetOwnerKey(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id") + require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) + op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix) require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err) + require.Equal(t, op, owner.OpNone) } func checkOwner(d DDL, fbVal bool) (isOwner bool) { diff --git a/owner/mock.go b/owner/mock.go index 48f8e1e2b9775..f5eb8954e0acb 100644 --- a/owner/mock.go +++ b/owner/mock.go @@ -17,8 +17,10 @@ package owner import ( "context" "sync/atomic" + "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl/util" ) var _ Manager = &mockManager{} @@ -77,6 +79,10 @@ func (m *mockManager) GetOwnerID(_ context.Context) (string, error) { return "", errors.New("no owner") } +func (*mockManager) SetOwnerOpValue(_ context.Context, _ OpType) error { + return nil +} + // CampaignOwner implements Manager.CampaignOwner interface. func (m *mockManager) CampaignOwner() error { m.toBeOwner() @@ -104,3 +110,46 @@ func (m *mockManager) SetBeOwnerHook(hook func()) { func (*mockManager) CampaignCancel() { // do nothing } + +func mockDelOwnerKey(mockCal, ownerKey string, m *ownerManager) error { + checkIsOwner := func(m *ownerManager, checkTrue bool) error { + // 5s + for i := 0; i < 100; i++ { + if m.IsOwner() == checkTrue { + break + } + time.Sleep(50 * time.Millisecond) + } + if m.IsOwner() != checkTrue { + return errors.Errorf("expect manager state:%v", checkTrue) + } + return nil + } + + needCheckOwner := false + switch mockCal { + case "delOwnerKeyAndNotOwner": + m.CampaignCancel() + // Make sure the manager is not owner. And it will exit campaignLoop. + err := checkIsOwner(m, false) + if err != nil { + return err + } + case "onlyDelOwnerKey": + needCheckOwner = true + } + + err := util.DeleteKeyFromEtcd(ownerKey, m.etcdCli, 1, keyOpDefaultTimeout) + if err != nil { + return errors.Trace(err) + } + if needCheckOwner { + // Mock the manager become not owner because the owner is deleted(like TTL is timeout). + // And then the manager campaigns the owner again, and become the owner. + err = checkIsOwner(m, true) + if err != nil { + return err + } + } + return nil +}