Skip to content

Commit

Permalink
owner: add setting/getting the owner value info (#43353)
Browse files Browse the repository at this point in the history
close #43352
  • Loading branch information
zimulala authored May 4, 2023
1 parent 7eff8ce commit 10776b3
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 60 deletions.
1 change: 1 addition & 0 deletions metrics/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
WatcherClosed = "watcher_closed"
Cancelled = "cancelled"
Deleted = "deleted"
PutValue = "put_value"
SessionDone = "session_done"
CtxDone = "context_done"
WatchOwnerCounter *prometheus.CounterVec
Expand Down
5 changes: 4 additions & 1 deletion owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ go_library(
importpath = "github.com/pingcap/tidb/owner",
visibility = ["//visibility:public"],
deps = [
"//ddl/util",
"//metrics",
"//parser/terror",
"//util",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_api_v3//mvccpb",
"@io_etcd_go_etcd_api_v3//v3rpc/rpctypes",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -32,10 +34,11 @@ go_test(
],
embed = [":owner"],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//ddl",
"//infoschema",
"//kv",
"//parser/terror",
"//store/mockstore",
"//testkit/testsetup",
Expand Down
122 changes: 108 additions & 14 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package owner

import (
"bytes"
"context"
"fmt"
"os"
Expand All @@ -25,6 +26,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/terror"
util2 "github.com/pingcap/tidb/util"
Expand All @@ -46,6 +48,8 @@ type Manager interface {
RetireOwner()
// GetOwnerID gets the owner ID.
GetOwnerID(ctx context.Context) (string, error)
// SetOwnerOpValue updates the owner op value.
SetOwnerOpValue(ctx context.Context, op OpType) error
// CampaignOwner campaigns the owner.
CampaignOwner() error
// ResignOwner lets the owner start a new election.
Expand All @@ -65,6 +69,25 @@ const (
keyOpDefaultTimeout = 5 * time.Second
)

// OpType is the owner key value operation type.
type OpType byte

// List operation of types.
const (
OpNone OpType = 0
OpGetUpgradingState OpType = 1
)

// String implements fmt.Stringer interface.
func (ot OpType) String() string {
switch ot {
case OpGetUpgradingState:
return "get upgrading state"
default:
return "none"
}
}

// DDLOwnerChecker is used to check whether tidb is owner.
type DDLOwnerChecker interface {
// IsOwner returns whether the ownerManager is the owner.
Expand Down Expand Up @@ -225,6 +248,12 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
return
}
case <-campaignContext.Done():
failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if v.(string) == "delOwnerKeyAndNotOwner" {
logutil.Logger(logCtx).Info("mock break campaign and don't clear related info")
return
}
})
logutil.Logger(logCtx).Info("break campaign loop, context is done")
m.revokeSession(logPrefix, etcdSession.Lease())
return
Expand All @@ -248,7 +277,7 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

ownerKey, err := GetOwnerInfo(campaignContext, logCtx, elec, m.id)
ownerKey, err := GetOwnerKey(campaignContext, logCtx, m.etcdCli, m.key, m.id)
if err != nil {
continue
}
Expand All @@ -274,32 +303,97 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...)
_, ownerID, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, error) {
var op OpType
resp, err := etcdCli.Get(ctx, ownerPath, clientv3.WithFirstCreate()...)
if err != nil {
return "", errors.Trace(err)
logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err))
return "", nil, op, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", concurrency.ErrElectionNoLeader
return "", nil, op, 0, concurrency.ErrElectionNoLeader
}
return string(resp.Kvs[0].Value), nil

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].ModRevision, nil
}

// GetOwnerInfo gets the owner information.
func GetOwnerInfo(ctx, logCtx context.Context, elec *concurrency.Election, id string) (string, error) {
resp, err := elec.Leader(ctx)
// GetOwnerKey gets the owner key information.
func GetOwnerKey(ctx, logCtx context.Context, etcdCli *clientv3.Client, etcdKey, id string) (string, error) {
ownerKey, ownerID, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
if err != nil {
// If no leader elected currently, it returns ErrElectionNoLeader.
logutil.Logger(logCtx).Info("failed to get leader", zap.Error(err))
return "", errors.Trace(err)
}
ownerID := string(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.String("ownerID", ownerID))
if ownerID != id {
if string(ownerID) != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
}

return string(resp.Kvs[0].Key), nil
return ownerKey, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
vals := bytes.Split(val, []byte("_"))
var op OpType
if len(vals) == 2 {
op = OpType(vals[1][0])
}
return vals[0], op
}

func joinOwnerValues(vals ...[]byte) []byte {
return bytes.Join(vals, []byte("_"))
}

// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
if currOp == op {
logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op))
return nil
}
if string(ownerID) != m.id {
return errors.New("ownerInfoNotMatch")
}
newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)})

failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if valStr, ok := v.(string); ok {
if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil {
failpoint.Return(err)
}
}
})

resp, err := m.etcdCli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)).
Then(clientv3.OpPut(ownerKey, string(newOwnerVal))).
Commit()
logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID),
zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Bool("isSuc", resp.Succeeded), zap.Error(err))
if !resp.Succeeded {
err = errors.New("put owner key failed, cmp is false")
}
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}

// GetOwnerOpValue gets the owner op value.
func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) {
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
Expand Down
Loading

0 comments on commit 10776b3

Please sign in to comment.