Skip to content

Commit

Permalink
feat: improve mysql ha (#4634)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuriwuyun authored Aug 14, 2023
1 parent 600eb86 commit 31903e8
Show file tree
Hide file tree
Showing 11 changed files with 828 additions and 168 deletions.
40 changes: 14 additions & 26 deletions cmd/probe/internal/binding/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import (
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
"github.com/pkg/errors"
"github.com/spf13/viper"
"golang.org/x/exp/slices"

. "github.com/apecloud/kubeblocks/cmd/probe/internal"
. "github.com/apecloud/kubeblocks/cmd/probe/internal/binding"
"github.com/apecloud/kubeblocks/cmd/probe/internal/component/mysql"
"github.com/apecloud/kubeblocks/cmd/probe/internal/dcs"
"github.com/apecloud/kubeblocks/internal/constant"
. "github.com/apecloud/kubeblocks/internal/sqlchannel/util"
)

Expand Down Expand Up @@ -115,11 +117,19 @@ func (mysqlOps *MysqlOperations) Init(metadata bindings.Metadata) error {
}

func (mysqlOps *MysqlOperations) GetRole(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (string, error) {
workloadType := request.Metadata[WorkloadTypeKey]
workloadType := viper.GetString(constant.KBEnvWorkloadType)
if strings.EqualFold(workloadType, Replication) {
return mysqlOps.GetRoleForReplication(ctx, request, response)
dcsStore := dcs.GetStore()
if dcsStore == nil {
return "", nil
}
k8sStore := dcsStore.(*dcs.KubernetesStore)
cluster := k8sStore.GetClusterFromCache()
if cluster == nil || !cluster.IsLocked() {
return "", nil
}
}
return mysqlOps.GetRoleForConsensus(ctx, request, response)
return mysqlOps.manager.GetRole(ctx)
}

func (mysqlOps *MysqlOperations) GetRunningPort() int {
Expand All @@ -139,29 +149,7 @@ func (mysqlOps *MysqlOperations) GetRoleForReplication(ctx context.Context, requ
return SECONDARY, nil
}

getReadOnlySQL := `show global variables like 'read_only';`
data, err := mysqlOps.query(ctx, getReadOnlySQL)
if err != nil {
mysqlOps.Logger.Infof("error executing %s: %v", getReadOnlySQL, err)
return "", errors.Wrapf(err, "error executing %s", getReadOnlySQL)
}

queryRes := &QueryRes{}
err = json.Unmarshal(data, queryRes)
if err != nil {
return "", errors.Errorf("parse query failed, err:%v", err)
}

for _, mapVal := range *queryRes {
if mapVal["Variable_name"] == "read_only" {
if mapVal["Value"].(string) == "OFF" {
return PRIMARY, nil
} else if mapVal["Value"].(string) == "ON" {
return SECONDARY, nil
}
}
}
return "", errors.Errorf("parse query failed, no records")
return PRIMARY, nil
}

func (mysqlOps *MysqlOperations) GetRoleForConsensus(ctx context.Context, request *bindings.InvokeRequest, response *bindings.InvokeResponse) (string, error) {
Expand Down
65 changes: 54 additions & 11 deletions cmd/probe/internal/component/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,24 @@ type DBManager interface {
// Functions related to cluster initialization.
InitializeCluster(context.Context, *dcs.Cluster) error
IsClusterInitialized(context.Context, *dcs.Cluster) (bool, error)
// IsCurrentMemberInCluster checks if current member is configured in cluster for consensus.
// it will always return true for replicationset.
IsCurrentMemberInCluster(context.Context, *dcs.Cluster) bool

// Functions related to cluster healthy check.
IsCurrentMemberHealthy(context.Context) bool
// IsClusterHealthy is only for consensus cluster healthy check.
// For Replication cluster IsClusterHealthy will always return true,
// and its cluster's healthty is equal to leader member's heathly.
IsClusterHealthy(context.Context, *dcs.Cluster) bool

// Member healthy check
IsMemberHealthy(context.Context, *dcs.Cluster, *dcs.Member) bool
IsCurrentMemberHealthy(context.Context, *dcs.Cluster) bool
IsMemberLagging(context.Context, *dcs.Cluster, *dcs.Member) bool
GetDBState(context.Context, *dcs.Cluster, *dcs.Member) *dcs.DBState

// HasOtherHealthyLeader is applicable only to consensus cluster,
// where the db's internal role services as the source of truth.
// for replicationset cluster, HasOtherHealthyLeader will always be nil.
HasOtherHealthyLeader(context.Context, *dcs.Cluster) *dcs.Member
HasOtherHealthyMembers(context.Context, *dcs.Cluster, string) []*dcs.Member

Expand All @@ -57,11 +67,17 @@ type DBManager interface {
AddCurrentMemberToCluster(*dcs.Cluster) error
DeleteMemberFromCluster(*dcs.Cluster, string) error

// IsPromoted is applicable only to consensus cluster, which is used to
// check if DB has complete switchover.
// for replicationset cluster, it will always be true.
IsPromoted(context.Context) bool
// Functions related to HA
Promote() error
Demote() error
Follow(*dcs.Cluster) error
Recover()
// The functions should be idempotent, indicating that if they have been executed in one ha cycle,
// any subsequent calls during that cycle will have no effect.
Promote(context.Context) error
Demote(context.Context) error
Follow(context.Context, *dcs.Cluster) error
Recover(context.Context) error

GetHealthiestMember(*dcs.Cluster, string) *dcs.Member
// IsHealthiestMember(*dcs.Cluster) bool
Expand Down Expand Up @@ -89,6 +105,8 @@ type DBManagerBase struct {
DataDir string
Logger logger.Logger
DBStartupReady bool
IsLocked bool
DBState *dcs.DBState
}

func (mgr *DBManagerBase) IsDBStartupReady() bool {
Expand All @@ -107,6 +125,26 @@ func (mgr *DBManagerBase) IsFirstMember() bool {
return strings.HasSuffix(mgr.CurrentMemberName, "-0")
}

func (mgr *DBManagerBase) IsPromoted(context.Context) bool {
return true
}

func (mgr *DBManagerBase) IsClusterHealthy(context.Context, *dcs.Cluster) bool {
return true
}

func (mgr *DBManagerBase) HasOtherHealthyLeader(ctx context.Context, cluster *dcs.Cluster) *dcs.Member {
return nil
}

func (mgr *DBManagerBase) IsMemberLagging(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) bool {
return false
}

func (mgr *DBManagerBase) GetDBState(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) *dcs.DBState {
return nil
}

func RegisterManager(characterType string, manager DBManager) {
characterType = strings.ToLower(characterType)
managers[characterType] = manager
Expand Down Expand Up @@ -151,7 +189,7 @@ func (*FakeManager) IsCurrentMemberInCluster(context.Context, *dcs.Cluster) bool
return true
}

func (*FakeManager) IsCurrentMemberHealthy(context.Context) bool {
func (*FakeManager) IsCurrentMemberHealthy(context.Context, *dcs.Cluster) bool {
return true
}

Expand Down Expand Up @@ -191,19 +229,24 @@ func (*FakeManager) DeleteMemberFromCluster(*dcs.Cluster, string) error {
return fmt.Errorf("NotSuppported")
}

func (*FakeManager) Promote() error {
func (*FakeManager) Promote(context.Context) error {
return fmt.Errorf("NotSupported")
}

func (*FakeManager) Demote() error {
func (*FakeManager) IsPromoted(context.Context) bool {
return true
}

func (*FakeManager) Demote(context.Context) error {
return fmt.Errorf("NotSuppported")
}

func (*FakeManager) Follow(*dcs.Cluster) error {
func (*FakeManager) Follow(context.Context, *dcs.Cluster) error {
return fmt.Errorf("NotSupported")
}

func (*FakeManager) Recover() {
func (*FakeManager) Recover(context.Context) error {
return nil

}

Expand Down
74 changes: 55 additions & 19 deletions cmd/probe/internal/component/mongodb/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ import (
"github.com/apecloud/kubeblocks/cmd/probe/internal/dcs"
)

const (
PrimaryPriority = 2
SecondaryPriority = 1
)

type Manager struct {
component.DBManagerBase
Client *mongo.Client
Expand Down Expand Up @@ -104,9 +109,9 @@ func (mgr *Manager) InitiateReplSet(ctx context.Context, cluster *dcs.Cluster) e
configMembers[i].ID = i
configMembers[i].Host = cluster.GetMemberAddrWithPort(member)
if strings.HasPrefix(member.Name, mgr.CurrentMemberName) {
configMembers[i].Priority = 2
configMembers[i].Priority = PrimaryPriority
} else {
configMembers[i].Priority = 1
configMembers[i].Priority = SecondaryPriority
}
}

Expand Down Expand Up @@ -419,8 +424,8 @@ func (mgr *Manager) IsCurrentMemberInCluster(ctx context.Context, cluster *dcs.C
return false
}

func (mgr *Manager) IsCurrentMemberHealthy(ctx context.Context) bool {
return mgr.IsMemberHealthy(ctx, nil, nil)
func (mgr *Manager) IsCurrentMemberHealthy(ctx context.Context, cluster *dcs.Cluster) bool {
return mgr.IsMemberHealthy(ctx, cluster, nil)
}

func (mgr *Manager) IsMemberHealthy(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) bool {
Expand All @@ -444,7 +449,9 @@ func (mgr *Manager) IsMemberHealthy(ctx context.Context, cluster *dcs.Cluster, m
return false
}

func (mgr *Manager) Recover() {}
func (mgr *Manager) Recover(context.Context) error {
return nil
}

func (mgr *Manager) AddCurrentMemberToCluster(cluster *dcs.Cluster) error {
client, err := mgr.GetReplSetClient(context.TODO(), cluster)
Expand All @@ -470,7 +477,7 @@ func (mgr *Manager) AddCurrentMemberToCluster(cluster *dcs.Cluster) error {
}
configMember.ID = lastID + 1
configMember.Host = currentHost
configMember.Priority = 1
configMember.Priority = SecondaryPriority
rsConfig.Members = append(rsConfig.Members, configMember)

rsConfig.Version++
Expand Down Expand Up @@ -519,38 +526,65 @@ func (mgr *Manager) IsClusterHealthy(ctx context.Context, cluster *dcs.Cluster)
return status.OK != 0
}

func (mgr *Manager) Promote() error {
rsConfig, err := mgr.GetReplSetConfig(context.TODO())
func (mgr *Manager) IsPromoted(ctx context.Context) bool {
isLeader, err := mgr.IsLeader(ctx, nil)
if err != nil || !isLeader {
mgr.Logger.Errorf("Is leader check failed: %v", err)
return false
}

rsConfig, err := mgr.GetReplSetConfig(ctx)
if rsConfig == nil {
mgr.Logger.Errorf("Get replSet config failed: %v", err)
return err
return false
}
for i := range rsConfig.Members {
if strings.HasPrefix(rsConfig.Members[i].Host, mgr.CurrentMemberName) {
if rsConfig.Members[i].Priority == PrimaryPriority {
return true
}
}
}
return false
}

hosts := mgr.GetMemberAddrsFromRSConfig(rsConfig)
client, err := NewReplSetClient(context.TODO(), hosts)
if err != nil {
func (mgr *Manager) Promote(ctx context.Context) error {
rsConfig, err := mgr.GetReplSetConfig(ctx)
if rsConfig == nil {
mgr.Logger.Errorf("Get replSet config failed: %v", err)
return err
}
defer client.Disconnect(context.TODO()) //nolint:errcheck

for i := range rsConfig.Members {
if strings.HasPrefix(rsConfig.Members[i].Host, mgr.CurrentMemberName) {
rsConfig.Members[i].Priority = 2
} else if rsConfig.Members[i].Priority == 2 {
rsConfig.Members[i].Priority = 1
if rsConfig.Members[i].Priority == PrimaryPriority {
mgr.Logger.Debugf("Current member already has the highest priority!")
return nil
}

rsConfig.Members[i].Priority = PrimaryPriority
} else if rsConfig.Members[i].Priority == PrimaryPriority {
rsConfig.Members[i].Priority = SecondaryPriority
}
}

rsConfig.Version++
return SetReplSetConfig(context.TODO(), client, rsConfig)

hosts := mgr.GetMemberAddrsFromRSConfig(rsConfig)
client, err := NewReplSetClient(ctx, hosts)
if err != nil {
return err
}
defer client.Disconnect(ctx) //nolint:errcheck
return SetReplSetConfig(ctx, client, rsConfig)
}

func (mgr *Manager) Demote() error {
func (mgr *Manager) Demote(context.Context) error {
// mongodb do premote and demote in one action, here do nothing.
return nil
}

func (mgr *Manager) Follow(cluster *dcs.Cluster) error {
func (mgr *Manager) Follow(ctx context.Context, cluster *dcs.Cluster) error {
return nil
}

Expand Down Expand Up @@ -661,6 +695,7 @@ func (mgr *Manager) Lock(ctx context.Context, reason string) error {
err := errors.Errorf("mongo says: %s", lockResp.Errmsg)
return err
}
mgr.IsLocked = true
mgr.Logger.Infof("Lock db success times: %d", lockResp.LockCount)
return nil
}
Expand Down Expand Up @@ -694,6 +729,7 @@ func (mgr *Manager) Unlock(ctx context.Context) error {
return err
}
}
mgr.IsLocked = false
mgr.Logger.Infof("Unlock db success")
return nil
}
Loading

0 comments on commit 31903e8

Please sign in to comment.