Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage MySQL Replication Status States Properly #9853

Merged
merged 9 commits into from
Mar 11, 2022
2 changes: 1 addition & 1 deletion go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime))
break
}
if !status.ReplicationRunning() {
if !status.Healthy() {
log.Warning("Replication has stopped before backup could be taken. Trying to restart replication.")
if err := startReplication(ctx, mysqld, topoServer); err != nil {
log.Warningf("Failed to restart replication: %v", err)
Expand Down
30 changes: 30 additions & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,3 +680,33 @@ func IsSchemaApplyError(err error) bool {
}
return false
}

type ReplicationState int

const (
ReplicationStateUnknown ReplicationState = iota
ReplicationStateStopped
ReplicationStateConnecting
ReplicationStateRunning
)

// ReplicationStatusToState converts a value you have for the IO thread(s) or SQL
// thread(s) or Group Replication applier thread(s) from MySQL or intermediate
// layers to a mysql.ReplicationState.
// on,yes,true == ReplicationStateRunning
// off,no,false == ReplicationStateStopped
// connecting == ReplicationStateConnecting
// anything else == ReplicationStateUnknown
func ReplicationStatusToState(s string) ReplicationState {
// Group Replication uses ON instead of Yes
switch strings.ToLower(s) {
case "yes", "on", "true":
return ReplicationStateRunning
case "no", "off", "false":
return ReplicationStateStopped
case "connecting":
return ReplicationStateConnecting
default:
return ReplicationStateUnknown
}
}
6 changes: 4 additions & 2 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus {
status := ReplicationStatus{
SourceHost: fields["Master_Host"],
// These fields are returned from the underlying DB and cannot be renamed
IOThreadRunning: fields["Slave_IO_Running"] == "Yes" || fields["Slave_IO_Running"] == "Connecting",
SQLThreadRunning: fields["Slave_SQL_Running"] == "Yes",
IOState: ReplicationStatusToState(fields["Slave_IO_Running"]),
LastIOError: fields["Last_IO_Error"],
SQLState: ReplicationStatusToState(fields["Slave_SQL_Running"]),
LastSQLError: fields["Last_SQL_Error"],
}
parseInt, _ := strconv.ParseInt(fields["Master_Port"], 10, 0)
status.SourcePort = int(parseInt)
Expand Down
16 changes: 8 additions & 8 deletions go/mysql/flavor_mysqlgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,32 +141,32 @@ func (mysqlGRFlavor) status(c *Conn) (ReplicationStatus, error) {
return res, nil
}

// Populate IOThreadRunning from replication_connection_status
// Populate IOState from replication_connection_status
query = fmt.Sprintf(`SELECT SERVICE_STATE
FROM performance_schema.replication_connection_status
WHERE CHANNEL_NAME='%s'`, chanel)
var ioThreadRunning bool
var connectionState ReplicationState
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
ioThreadRunning = values[0].ToString() == "ON"
connectionState = ReplicationStatusToState(values[0].ToString())
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
res.IOThreadRunning = ioThreadRunning
// Populate SQLThreadRunning from replication_connection_status
var sqlThreadRunning bool
res.IOState = connectionState
// Populate SQLState from replication_connection_status
var applierState ReplicationState
query = fmt.Sprintf(`SELECT SERVICE_STATE
FROM performance_schema.replication_applier_status_by_coordinator
WHERE CHANNEL_NAME='%s'`, chanel)
err = fetchStatusForGroupReplication(c, query, func(values []sqltypes.Value) error {
sqlThreadRunning = values[0].ToString() == "ON"
applierState = ReplicationStatusToState(values[0].ToString())
return nil
})
if err != nil {
return ReplicationStatus{}, err
}
res.SQLThreadRunning = sqlThreadRunning
res.SQLState = applierState

// Collect lag information
// we use the difference between the last processed transaction's commit time
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/flavor_mysqlgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestMysqlGRParsePrimaryGroupMember(t *testing.T) {
parsePrimaryGroupMember(&res, rows)
assert.Equal(t, "host1", res.SourceHost)
assert.Equal(t, 10, res.SourcePort)
assert.Equal(t, false, res.IOThreadRunning)
assert.Equal(t, false, res.SQLThreadRunning)
assert.Equal(t, ReplicationStateUnknown, res.IOState)
assert.Equal(t, ReplicationStateUnknown, res.SQLState)
}

func TestMysqlGRReplicationApplierLagParse(t *testing.T) {
Expand Down
44 changes: 34 additions & 10 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ type ReplicationStatus struct {
FilePosition Position
FileRelayLogPosition Position
SourceServerID uint
IOThreadRunning bool
SQLThreadRunning bool
IOState ReplicationState
LastIOError string
SQLState ReplicationState
LastSQLError string
ReplicationLagSeconds uint
ReplicationLagUnknown bool
SourceHost string
Expand All @@ -47,10 +49,28 @@ type ReplicationStatus struct {
SourceUUID SID
}

// ReplicationRunning returns true iff both the IO and SQL threads are
// running.
func (s *ReplicationStatus) ReplicationRunning() bool {
return s.IOThreadRunning && s.SQLThreadRunning
// Running returns true if both the IO and SQL threads are running.
func (s *ReplicationStatus) Running() bool {
return s.IOState == ReplicationStateRunning && s.SQLState == ReplicationStateRunning
}

// Healthy returns true if both the SQL IO components are healthy
func (s *ReplicationStatus) Healthy() bool {
return s.SQLHealthy() && s.IOHealthy()
}

// IOHealthy returns true if the IO thread is running OR, the
// IO thread is connecting AND there's no IO error from the last
// attempt to connect to the source.
func (s *ReplicationStatus) IOHealthy() bool {
return s.IOState == ReplicationStateRunning ||
(s.IOState == ReplicationStateConnecting && s.LastIOError == "")
}

// SQLHealthy returns true if the SQLState is running.
// For consistency and to support altering this calculation in the future.
func (s *ReplicationStatus) SQLHealthy() bool {
return s.SQLState == ReplicationStateRunning
}

// ReplicationStatusToProto translates a Status to proto3.
Expand All @@ -61,13 +81,15 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
FilePosition: EncodePosition(s.FilePosition),
FileRelayLogPosition: EncodePosition(s.FileRelayLogPosition),
SourceServerId: uint32(s.SourceServerID),
IoThreadRunning: s.IOThreadRunning,
SqlThreadRunning: s.SQLThreadRunning,
ReplicationLagSeconds: uint32(s.ReplicationLagSeconds),
SourceHost: s.SourceHost,
SourcePort: int32(s.SourcePort),
ConnectRetry: int32(s.ConnectRetry),
SourceUuid: s.SourceUUID.String(),
IoState: int32(s.IOState),
LastIoError: s.LastIOError,
SqlState: int32(s.SQLState),
LastSqlError: s.LastSQLError,
}
}

Expand Down Expand Up @@ -102,13 +124,15 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
FilePosition: filePos,
FileRelayLogPosition: fileRelayPos,
SourceServerID: uint(s.SourceServerId),
IOThreadRunning: s.IoThreadRunning,
SQLThreadRunning: s.SqlThreadRunning,
ReplicationLagSeconds: uint(s.ReplicationLagSeconds),
SourceHost: s.SourceHost,
SourcePort: int(s.SourcePort),
ConnectRetry: int(s.ConnectRetry),
SourceUUID: sid,
IOState: ReplicationState(s.IoState),
LastIOError: s.LastIoError,
SQLState: ReplicationState(s.SqlState),
LastSQLError: s.LastSqlError,
}
}

Expand Down
24 changes: 12 additions & 12 deletions go/mysql/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,34 @@ import (

func TestStatusReplicationRunning(t *testing.T) {
input := &ReplicationStatus{
IOThreadRunning: true,
SQLThreadRunning: true,
IOState: ReplicationStatusToState("yes"),
SQLState: ReplicationStatusToState("yes"),
}
want := true
if got := input.ReplicationRunning(); got != want {
t.Errorf("%#v.ReplicationRunning() = %v, want %v", input, got, want)
if got := input.Running(); got != want {
t.Errorf("%#v.Running() = %v, want %v", input, got, want)
}
}

func TestStatusIOThreadNotRunning(t *testing.T) {
input := &ReplicationStatus{
IOThreadRunning: false,
SQLThreadRunning: true,
IOState: ReplicationStatusToState("no"),
SQLState: ReplicationStatusToState("yes"),
}
want := false
if got := input.ReplicationRunning(); got != want {
t.Errorf("%#v.ReplicationRunning() = %v, want %v", input, got, want)
if got := input.Running(); got != want {
t.Errorf("%#v.Running() = %v, want %v", input, got, want)
}
}

func TestStatusSQLThreadNotRunning(t *testing.T) {
input := &ReplicationStatus{
IOThreadRunning: true,
SQLThreadRunning: false,
IOState: ReplicationStatusToState("yes"),
SQLState: ReplicationStatusToState("no"),
}
want := false
if got := input.ReplicationRunning(); got != want {
t.Errorf("%#v.ReplicationRunning() = %v, want %v", input, got, want)
if got := input.Running(); got != want {
t.Errorf("%#v.Running() = %v, want %v", input, got, want)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupP
replicaStatus, err := params.Mysqld.ReplicationStatus()
switch err {
case nil:
replicaStartRequired = replicaStatus.ReplicationRunning() && !*DisableActiveReparents
replicaStartRequired = replicaStatus.Healthy() && !*DisableActiveReparents
case mysql.ErrNotReplica:
// keep going if we're the primary, might be a degenerate case
sourceIsPrimary = true
Expand Down
8 changes: 4 additions & 4 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error)
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// implemented as AND to avoid changing all tests that were
// previously using Replicating = false
IOThreadRunning: fmd.Replicating && fmd.IOThreadRunning,
SQLThreadRunning: fmd.Replicating,
SourceHost: fmd.CurrentSourceHost,
SourcePort: fmd.CurrentSourcePort,
IOState: mysql.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating && fmd.IOThreadRunning)),
SQLState: mysql.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating)),
SourceHost: fmd.CurrentSourceHost,
SourcePort: fmd.CurrentSourcePort,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func WaitForReplicationStart(mysqld MysqlDaemon, replicaStartDeadline int) error
return err
}

if status.ReplicationRunning() {
if status.Running() {
return nil
}
time.Sleep(time.Second)
Expand Down
Loading