Skip to content

Commit

Permalink
Merge pull request #9308 from planetscale/brokenreplicaserving
Browse files Browse the repository at this point in the history
Estimate replica lag when seconds behind from mysqld is unknown
  • Loading branch information
deepthi authored Dec 7, 2021
2 parents 53ab4f0 + 31560fd commit e689307
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 25 deletions.
11 changes: 9 additions & 2 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,15 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus {
status.SourcePort = int(parseInt)
parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 0)
status.ConnectRetry = int(parseInt)
parseUint, _ := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0)
status.ReplicationLagSeconds = uint(parseUint)
parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0)
if err != nil {
// we could not parse the value into a valid uint -- most commonly because the value is NULL from the
// database -- so let's reflect that the underlying value was unknown on our last check
status.ReplicationLagUnknown = true
} else {
status.ReplicationLagUnknown = false
status.ReplicationLagSeconds = uint(parseUint)
}
parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 0)
status.SourceServerID = uint(parseUint)

Expand Down
4 changes: 4 additions & 0 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ type ReplicationStatus struct {
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
// If ReplicationLagUnknown is true then we should not rely on the seconds
// behind value and we can instead try to calculate the lag ourselves when
// appropriate.
RelayLogPosition Position
FilePosition Position
FileRelayLogPosition Position
SourceServerID uint
IOThreadRunning bool
SQLThreadRunning bool
ReplicationLagSeconds uint
ReplicationLagUnknown bool
SourceHost string
SourcePort int
ConnectRetry int
Expand Down
39 changes: 23 additions & 16 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
tmClient *tmc.Client
primaryTabletParams mysql.ConnParams
replicaTabletParams mysql.ConnParams
primaryTablet cluster.Vttablet
replicaTablet cluster.Vttablet
rdonlyTablet cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
shardName = "0"
keyspaceShard = "ks/" + shardName
dbName = "vt_" + keyspaceName
username = "vt_dba"
cell = "zone1"
sqlSchema = `
clusterInstance *cluster.LocalProcessCluster
tmClient *tmc.Client
primaryTabletParams mysql.ConnParams
replicaTabletParams mysql.ConnParams
primaryTablet cluster.Vttablet
replicaTablet cluster.Vttablet
rdonlyTablet cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
shardName = "0"
keyspaceShard = "ks/" + shardName
dbName = "vt_" + keyspaceName
username = "vt_dba"
cell = "zone1"
tabletHealthcheckRefreshInterval = 5 * time.Second
tabletUnhealthyThreshold = tabletHealthcheckRefreshInterval * 2
sqlSchema = `
create table t1(
id bigint,
value varchar(16),
Expand Down Expand Up @@ -93,12 +95,17 @@ func TestMain(m *testing.M) {
}

// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{"-vschema_ddl_authorized_users=%"}
clusterInstance.VtGateExtraArgs = []string{
"-vschema_ddl_authorized_users=%",
"-discovery_low_replication_lag", tabletUnhealthyThreshold.String(),
}
// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
"-lock_tables_timeout", "5s",
"-watch_replication_stream",
"-enable_replication_reporter",
"-health_check_interval", tabletHealthcheckRefreshInterval.String(),
"-unhealthy_threshold", tabletUnhealthyThreshold.String(),
}
// We do not need semiSync for this test case.
clusterInstance.EnableSemiSync = false
Expand Down
58 changes: 52 additions & 6 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -134,7 +135,7 @@ func TestHealthCheck(t *testing.T) {
// make sure the health stream is updated
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
verifyStreamHealth(t, result)
verifyStreamHealth(t, result, true)

// then restart replication, make sure we stay healthy
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias)
Expand All @@ -148,7 +149,48 @@ func TestHealthCheck(t *testing.T) {
require.NoError(t, err)
scanner := bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text())
verifyStreamHealth(t, scanner.Text(), true)
}

// stop the replica's source mysqld instance to break replication
// and test that the replica tablet becomes unhealthy and non-serving after crossing
// the tablet's -unhealthy_threshold and the gateway's -discovery_low_replication_lag
err = primaryTablet.MysqlctlProcess.Stop()
require.NoError(t, err)

time.Sleep(tabletUnhealthyThreshold + tabletHealthcheckRefreshInterval)

// now the replica's VtTabletStreamHealth should show it as unhealthy
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
scanner = bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text(), false)
}

// start the primary tablet's mysqld back up
primaryTablet.MysqlctlProcess.InitMysql = false
err = primaryTablet.MysqlctlProcess.Start()
primaryTablet.MysqlctlProcess.InitMysql = true
require.NoError(t, err)

// explicitly start replication on all of the replicas to avoid any test flakiness as they were all
// replicating from the primary instance
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rdonlyTablet.Alias)
require.NoError(t, err)

time.Sleep(tabletHealthcheckRefreshInterval)

// now the replica's VtTabletStreamHealth should show it as healthy again
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
scanner = bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text(), true)
}

// Manual cleanup of processes
Expand Down Expand Up @@ -183,18 +225,22 @@ func checkTabletType(t *testing.T, tabletAlias string, typeWant string) {
assert.Equal(t, want, got)
}

func verifyStreamHealth(t *testing.T, result string) {
func verifyStreamHealth(t *testing.T, result string, expectHealthy bool) {
var streamHealthResponse querypb.StreamHealthResponse
err := json2.Unmarshal([]byte(result), &streamHealthResponse)
require.NoError(t, err)
serving := streamHealthResponse.GetServing()
UID := streamHealthResponse.GetTabletAlias().GetUid()
realTimeStats := streamHealthResponse.GetRealtimeStats()
replicationLagSeconds := realTimeStats.GetReplicationLagSeconds()
assert.True(t, serving, "Tablet should be in serving state")
assert.True(t, UID > 0, "Tablet should contain uid")
// replicationLagSeconds varies till 7200 so setting safe limit
assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary")
if expectHealthy {
assert.True(t, serving, "Tablet should be in serving state")
// replicationLagSeconds varies till 7200 so setting safe limit
assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary")
} else {
assert.True(t, (!serving || replicationLagSeconds >= uint32(tabletUnhealthyThreshold.Seconds())), "Tablet should not be in serving and healthy state")
}
}

func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func (p *poller) Status() (time.Duration, error) {
return 0, err
}

if !status.ReplicationRunning() {
// If replication is not currently running or we don't know what the lag is -- most commonly
// because the replica mysqld is in the process of trying to start replicating from its source
// but it hasn't yet reached the point where it can calculate the seconds_behind_master
// value and it's thus NULL -- then we will estimate the lag ourselves using the last seen
// value + the time elapsed since.
if !status.ReplicationRunning() || status.ReplicationLagUnknown {
if p.timeRecorded.IsZero() {
return 0, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "replication is not running")
}
Expand Down

0 comments on commit e689307

Please sign in to comment.