Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve Lorry performance #6109

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/lorry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/spf13/pflag"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
kzap "sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand Down Expand Up @@ -60,6 +61,7 @@ func main() {
Development: true,
}
opts.BindFlags(flag.CommandLine)
klog.InitFlags(nil)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()
err := viper.BindPFlags(pflag.CommandLine)
Expand Down
32 changes: 21 additions & 11 deletions pkg/lorry/dcs/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,24 +166,29 @@ func (store *KubernetesStore) GetCluster() (*Cluster, error) {
}
}

members, err := store.GetMembers()
if err != nil {
store.logger.Info("get members error", "error", err)
var members []Member
if store.cluster != nil && int(replicas) == len(store.cluster.Members) {
members = store.cluster.Members
} else {
members, err = store.GetMembers()
if err != nil {
return nil, err
}
}

leader, err := store.GetLeader()
if err != nil {
store.logger.Info("get leader error", "error", err)
store.logger.Info("get leader failed", "error", err)
}

switchover, err := store.GetSwitchover()
if err != nil {
store.logger.Info("get switchover error", "error", err)
store.logger.Info("get switchover failed", "error", err)
}

haConfig, err := store.GetHaConfig()
if err != nil {
store.logger.Info("get HaConfig error", "error", err)
store.logger.Info("get HaConfig failed", "error", err)
}

cluster := &Cluster{
Expand Down Expand Up @@ -352,7 +357,8 @@ func (store *KubernetesStore) DeleteLeader() error {
}

func (store *KubernetesStore) AttemptAcquireLease() error {
now := strconv.FormatInt(time.Now().Unix(), 10)
timestamp := time.Now().Unix()
now := strconv.FormatInt(timestamp, 10)
ttl := store.cluster.HaConfig.ttl
leaderName := store.currentMemberName
annotation := map[string]string{
Expand All @@ -371,11 +377,14 @@ func (store *KubernetesStore) AttemptAcquireLease() error {
cm, err := store.clientset.CoreV1().ConfigMaps(store.namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
if err != nil {
store.logger.Error(err, "Acquire lease failed")
} else {
store.cluster.Leader.Resource = cm
return err
}

return err
store.cluster.Leader.Resource = cm
store.cluster.Leader.Name = leaderName
store.cluster.Leader.AcquireTime = timestamp
store.cluster.Leader.RenewTime = timestamp
return nil
}

func (store *KubernetesStore) HasLease() bool {
Expand Down Expand Up @@ -407,6 +416,7 @@ func (store *KubernetesStore) ReleaseLease() error {
store.logger.Info("release lease")
configMap := store.cluster.Leader.Resource.(*corev1.ConfigMap)
configMap.Annotations["leader"] = ""
store.cluster.Leader.Name = ""

if store.cluster.Leader.DBState != nil {
str, _ := json.Marshal(store.cluster.Leader.DBState)
Expand Down Expand Up @@ -628,7 +638,7 @@ func getDBPort(pod *corev1.Pod) string {
func getLorryPort(pod *corev1.Pod) string {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == "probe-http-port" {
if port.Name == constant.LorryHTTPPortName {
return strconv.Itoa(int(port.ContainerPort))
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/lorry/dcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (c *Cluster) GetMemberAddr(member Member) string {
return fmt.Sprintf("%s.%s-headless.%s.svc.%s", member.Name, c.ClusterCompName, c.Namespace, clusterDomain)
}

func (c *Cluster) GetMemberShortAddr(member Member) string {
return fmt.Sprintf("%s.%s-headless.%s.svc", member.Name, c.ClusterCompName, c.Namespace)
}

func (c *Cluster) GetMemberAddrs() []string {
hosts := make([]string, len(c.Members))
for i, member := range c.Members {
Expand Down
93 changes: 75 additions & 18 deletions pkg/lorry/engines/mysql/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (mgr *Manager) IsRunning() bool {
if errors.As(err, &driverErr) {
// Now the error number is accessible directly
if driverErr.Number == 1040 {
mgr.Logger.Error(err, "Too many connections")
mgr.Logger.Info("connect failed: Too many connections")
return true
}
}
mgr.Logger.Error(err, "DB is not ready")
mgr.Logger.Info("DB is not ready", "error", err)
return false
}

Expand All @@ -118,7 +118,7 @@ func (mgr *Manager) IsDBStartupReady() bool {
// test if db is ready to connect or not
err := mgr.DB.PingContext(ctx)
if err != nil {
mgr.Logger.Error(err, "DB is not ready")
mgr.Logger.Info("DB is not ready", "error", err)
return false
}

Expand Down Expand Up @@ -212,13 +212,13 @@ func (mgr *Manager) IsMemberLagging(ctx context.Context, cluster *dcs.Cluster, m

db, err := mgr.GetMemberConnection(cluster, member)
if err != nil {
mgr.Logger.Error(err, "Get Member conn failed")
mgr.Logger.Info("Get Member conn failed", "error", err)
return false, 0
}

opTimestamp, err := mgr.GetOpTimestamp(ctx, db)
if err != nil {
mgr.Logger.Error(err, "get op timestamp failed")
mgr.Logger.Info("get op timestamp failed", "error", err)
return false, 0
}

Expand All @@ -232,7 +232,7 @@ func (mgr *Manager) IsMemberLagging(ctx context.Context, cluster *dcs.Cluster, m
func (mgr *Manager) IsMemberHealthy(ctx context.Context, cluster *dcs.Cluster, member *dcs.Member) bool {
db, err := mgr.GetMemberConnection(cluster, member)
if err != nil {
mgr.Logger.Error(err, "Get Member conn failed")
mgr.Logger.Info("Get Member conn failed", "error", err)
return false
}

Expand All @@ -253,25 +253,25 @@ func (mgr *Manager) GetDBState(ctx context.Context, cluster *dcs.Cluster) *dcs.D

globalState, err := mgr.GetGlobalState(ctx, mgr.DB)
if err != nil {
mgr.Logger.Error(err, "select global failed")
mgr.Logger.Info("select global failed", "error", err)
return nil
}

masterStatus, err := mgr.GetMasterStatus(ctx, mgr.DB)
if err != nil {
mgr.Logger.Error(err, "show master status failed")
mgr.Logger.Info("show master status failed", "error", err)
return nil
}

slaveStatus, err := mgr.GetSlaveStatus(ctx, mgr.DB)
if err != nil {
mgr.Logger.Error(err, "show slave status failed")
mgr.Logger.Info("show slave status failed", "error", err)
return nil
}

opTimestamp, err := mgr.GetOpTimestamp(ctx, mgr.DB)
if err != nil {
mgr.Logger.Error(err, "get op timestamp failed")
mgr.Logger.Info("get op timestamp failed", "error", err)
return nil
}

Expand Down Expand Up @@ -336,7 +336,7 @@ func (mgr *Manager) ReadCheck(ctx context.Context, db *sql.DB) bool {
// no healthy database, return true
return true
}
mgr.Logger.Error(err, "Read check failed")
mgr.Logger.Info("Read check failed", "error", err)
return false
}

Expand Down Expand Up @@ -422,6 +422,10 @@ func (mgr *Manager) LeaveMemberFromCluster(context.Context, *dcs.Cluster, string

// IsClusterInitialized is a method to check if cluster is initialized or not
func (mgr *Manager) IsClusterInitialized(ctx context.Context, _ *dcs.Cluster) (bool, error) {
err := mgr.EnableSemiSyncIfNeed(ctx)
if err != nil {
return false, err
}
return mgr.EnsureServerID(ctx)
}

Expand All @@ -448,7 +452,54 @@ func (mgr *Manager) EnsureServerID(ctx context.Context) (bool, error) {
return true, nil
}

func (mgr *Manager) Promote(context.Context, *dcs.Cluster) error {
func (mgr *Manager) EnableSemiSyncIfNeed(ctx context.Context) error {
var status string
err := mgr.DB.QueryRowContext(ctx, "SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS "+
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").Scan(&status)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
mgr.Logger.Error(err, "Get rpl_semi_sync_source plugin status failed: %v")
return err
}

// In MySQL 8.0, semi-sync configuration options should not be specified in my.cnf,
// as this may cause the database initialization process to fail:
// [Warning] [MY-013501] [Server] Ignoring --plugin-load[_add] list as the server is running with --initialize(-insecure).
// [ERROR] [MY-000067] [Server] unknown variable 'rpl_semi_sync_master_enabled=1'.
if status == "ACTIVE" {
setSourceEnable := "SET GLOBAL rpl_semi_sync_source_enabled = 1;" +
"SET GLOBAL rpl_semi_sync_source_timeout = 1000;"
_, err = mgr.DB.Exec(setSourceEnable)
if err != nil {
mgr.Logger.Error(err, setSourceEnable+" execute failed")
return err
}
}

err = mgr.DB.QueryRowContext(ctx, "SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS "+
"WHERE PLUGIN_NAME ='rpl_semi_sync_replica';").Scan(&status)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
mgr.Logger.Error(err, "Get rpl_semi_sync_replica plugin status failed: %v")
return err
}

if status == "ACTIVE" {
setSourceEnable := "SET GLOBAL rpl_semi_sync_replica_enabled = 1;"
_, err = mgr.DB.Exec(setSourceEnable)
if err != nil {
mgr.Logger.Error(err, setSourceEnable+" execute failed")
return err
}
}
return nil
}

func (mgr *Manager) Promote(ctx context.Context, cluster *dcs.Cluster) error {
if (mgr.globalState["super_read_only"] == "0" && mgr.globalState["read_only"] == "0") &&
(len(mgr.slaveStatus) == 0 || (mgr.slaveStatus.GetString("Slave_IO_Running") == "No" &&
mgr.slaveStatus.GetString("Slave_SQL_Running") == "No")) {
Expand All @@ -462,6 +513,8 @@ func (mgr *Manager) Promote(context.Context, *dcs.Cluster) error {
return err
}

// fresh db state
mgr.GetDBState(ctx, cluster)
mgr.Logger.Info(fmt.Sprintf("promote success, resp:%v", resp))
return nil
}
Expand All @@ -477,7 +530,7 @@ func (mgr *Manager) Demote(context.Context) error {
return nil
}

func (mgr *Manager) Follow(_ context.Context, cluster *dcs.Cluster) error {
func (mgr *Manager) Follow(ctx context.Context, cluster *dcs.Cluster) error {
leaderMember := cluster.GetLeaderMember()
if leaderMember == nil {
return fmt.Errorf("cluster has no leader")
Expand All @@ -493,8 +546,10 @@ func (mgr *Manager) Follow(_ context.Context, cluster *dcs.Cluster) error {
}

stopSlave := `stop slave;`
// MySQL 5.7 has a limitation where the length of the master_host cannot exceed 60 characters.
masterHost := cluster.GetMemberShortAddr(*leaderMember)
changeMaster := fmt.Sprintf(`change master to master_host='%s',master_user='%s',master_password='%s',master_port=%s,master_auto_position=1;`,
cluster.GetMemberAddr(*leaderMember), config.username, config.password, leaderMember.DBPort)
masterHost, config.username, config.password, leaderMember.DBPort)
startSlave := `start slave;`

_, err := mgr.DB.Exec(stopSlave + changeMaster + startSlave)
Expand All @@ -503,6 +558,8 @@ func (mgr *Manager) Follow(_ context.Context, cluster *dcs.Cluster) error {
return err
}

// fresh db state
mgr.GetDBState(ctx, cluster)
mgr.Logger.Info("successfully follow new leader", "leader-name", leaderMember.Name)
return nil
}
Expand All @@ -514,10 +571,10 @@ func (mgr *Manager) isRecoveryConfOutdated(leader string) bool {
return true
}

ioError := rowMap.GetString("Last_IO_Error")
sqlError := rowMap.GetString("Last_SQL_Error")
if ioError != "" || sqlError != "" {
mgr.Logger.Error(nil, fmt.Sprintf("slave status error, sqlError: %s, ioError: %s", sqlError, ioError))
ioRunning := rowMap.GetString("Slave_IO_Running")
sqlRunning := rowMap.GetString("Slave_SQL_Running")
if ioRunning == "No" || sqlRunning == "No" {
mgr.Logger.Error(nil, fmt.Sprintf("slave status error, %v", rowMap))
return true
}

Expand Down
23 changes: 23 additions & 0 deletions pkg/lorry/engines/mysql/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ func TestManager_IsClusterInitialized(t *testing.T) {
manager.serverID = 1

t.Run("query server id failed", func(t *testing.T) {
mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}))
mock.ExpectQuery("select @@global.server_id").
WillReturnError(fmt.Errorf("some error"))

Expand All @@ -517,6 +519,8 @@ func TestManager_IsClusterInitialized(t *testing.T) {
})

t.Run("server id equal to manager's server id", func(t *testing.T) {
mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}))
mock.ExpectQuery("select @@global.server_id").
WillReturnRows(sqlmock.NewRows([]string{"@@global.server_id"}).AddRow(1))

Expand All @@ -526,6 +530,8 @@ func TestManager_IsClusterInitialized(t *testing.T) {
})

t.Run("set server id failed", func(t *testing.T) {
mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}))
mock.ExpectQuery("select @@global.server_id").
WillReturnRows(sqlmock.NewRows([]string{"@@global.server_id"}).AddRow(2))
mock.ExpectExec("set global server_id").
Expand All @@ -538,6 +544,8 @@ func TestManager_IsClusterInitialized(t *testing.T) {
})

t.Run("set server id successfully", func(t *testing.T) {
mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}))
mock.ExpectQuery("select @@global.server_id").
WillReturnRows(sqlmock.NewRows([]string{"@@global.server_id"}).AddRow(2))
mock.ExpectExec("set global server_id").
Expand All @@ -548,6 +556,21 @@ func TestManager_IsClusterInitialized(t *testing.T) {
assert.Nil(t, err)
})

t.Run("set semi sync successfully", func(t *testing.T) {
mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_source';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}).AddRow("ACTIVE"))
mock.ExpectExec("SET GLOBAL rpl_semi_sync_source_enabled = 1;" +
"SET GLOBAL rpl_semi_sync_source_timeout = 1000;").
WillReturnResult(sqlmock.NewResult(1, 1))

mock.ExpectQuery("SELECT PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS " +
"WHERE PLUGIN_NAME ='rpl_semi_sync_replica';").WillReturnRows(sqlmock.NewRows([]string{"PLUGIN_STATUS"}).AddRow("ACTIVE"))
mock.ExpectExec("SET GLOBAL rpl_semi_sync_replica_enabled = 1;").
WillReturnResult(sqlmock.NewResult(1, 1))
err := manager.EnableSemiSyncIfNeed(ctx)
assert.Nil(t, err)
})

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %v", err)
}
Expand Down
Loading
Loading