Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Estimate replica lag when seconds behind from mysqld is unknown #9308

Merged
merged 4 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,15 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus {
status.SourcePort = int(parseInt)
parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 0)
status.ConnectRetry = int(parseInt)
parseUint, _ := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0)
status.ReplicationLagSeconds = uint(parseUint)
parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0)
if err != nil {
// we could not parse the value into a valid uint -- most commonly because the value is NULL from the
// database -- so let's reflect that the underlying value was unknown on our last check
status.ReplicationLagUnknown = true
} else {
status.ReplicationLagUnknown = false
status.ReplicationLagSeconds = uint(parseUint)
}
parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 0)
status.SourceServerID = uint(parseUint)

Expand Down
4 changes: 4 additions & 0 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ type ReplicationStatus struct {
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
// If ReplicationLagUnknown is true then we should not rely on the seconds
// behind value and we can instead try to calculate the lag ourselves when
// appropriate.
RelayLogPosition Position
FilePosition Position
FileRelayLogPosition Position
SourceServerID uint
IOThreadRunning bool
SQLThreadRunning bool
ReplicationLagSeconds uint
ReplicationLagUnknown bool
SourceHost string
SourcePort int
ConnectRetry int
Expand Down
39 changes: 23 additions & 16 deletions go/test/endtoend/tabletmanager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
tmClient *tmc.Client
primaryTabletParams mysql.ConnParams
replicaTabletParams mysql.ConnParams
primaryTablet cluster.Vttablet
replicaTablet cluster.Vttablet
rdonlyTablet cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
shardName = "0"
keyspaceShard = "ks/" + shardName
dbName = "vt_" + keyspaceName
username = "vt_dba"
cell = "zone1"
sqlSchema = `
clusterInstance *cluster.LocalProcessCluster
tmClient *tmc.Client
primaryTabletParams mysql.ConnParams
replicaTabletParams mysql.ConnParams
primaryTablet cluster.Vttablet
replicaTablet cluster.Vttablet
rdonlyTablet cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
shardName = "0"
keyspaceShard = "ks/" + shardName
dbName = "vt_" + keyspaceName
username = "vt_dba"
cell = "zone1"
tabletHealthcheckRefreshInterval = 5 * time.Second
tabletUnhealthyThreshold = tabletHealthcheckRefreshInterval * 2
sqlSchema = `
create table t1(
id bigint,
value varchar(16),
Expand Down Expand Up @@ -93,12 +95,17 @@ func TestMain(m *testing.M) {
}

// List of users authorized to execute vschema ddl operations
clusterInstance.VtGateExtraArgs = []string{"-vschema_ddl_authorized_users=%"}
clusterInstance.VtGateExtraArgs = []string{
"-vschema_ddl_authorized_users=%",
"-discovery_low_replication_lag", tabletUnhealthyThreshold.String(),
}
// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
"-lock_tables_timeout", "5s",
"-watch_replication_stream",
"-enable_replication_reporter",
"-health_check_interval", tabletHealthcheckRefreshInterval.String(),
"-unhealthy_threshold", tabletUnhealthyThreshold.String(),
}
// We do not need semiSync for this test case.
clusterInstance.EnableSemiSync = false
Expand Down
58 changes: 52 additions & 6 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -134,7 +135,7 @@ func TestHealthCheck(t *testing.T) {
// make sure the health stream is updated
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
verifyStreamHealth(t, result)
verifyStreamHealth(t, result, true)

// then restart replication, make sure we stay healthy
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias)
Expand All @@ -148,7 +149,48 @@ func TestHealthCheck(t *testing.T) {
require.NoError(t, err)
scanner := bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text())
verifyStreamHealth(t, scanner.Text(), true)
}

// stop the replica's source mysqld instance to break replication
// and test that the replica tablet becomes unhealthy and non-serving after crossing
// the tablet's -unhealthy_threshold and the gateway's -discovery_low_replication_lag
err = primaryTablet.MysqlctlProcess.Stop()
require.NoError(t, err)

time.Sleep(tabletUnhealthyThreshold + tabletHealthcheckRefreshInterval)

// now the replica's VtTabletStreamHealth should show it as unhealthy
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
scanner = bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text(), false)
}

// start the primary tablet's mysqld back up
primaryTablet.MysqlctlProcess.InitMysql = false
err = primaryTablet.MysqlctlProcess.Start()
primaryTablet.MysqlctlProcess.InitMysql = true
require.NoError(t, err)

// explicitly start replication on all of the replicas to avoid any test flakiness as they were all
// replicating from the primary instance
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rdonlyTablet.Alias)
require.NoError(t, err)

time.Sleep(tabletHealthcheckRefreshInterval)

// now the replica's VtTabletStreamHealth should show it as healthy again
result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias)
require.NoError(t, err)
scanner = bufio.NewScanner(strings.NewReader(result))
for scanner.Scan() {
verifyStreamHealth(t, scanner.Text(), true)
}

// Manual cleanup of processes
Expand Down Expand Up @@ -183,18 +225,22 @@ func checkTabletType(t *testing.T, tabletAlias string, typeWant string) {
assert.Equal(t, want, got)
}

func verifyStreamHealth(t *testing.T, result string) {
func verifyStreamHealth(t *testing.T, result string, expectHealthy bool) {
var streamHealthResponse querypb.StreamHealthResponse
err := json2.Unmarshal([]byte(result), &streamHealthResponse)
require.NoError(t, err)
serving := streamHealthResponse.GetServing()
UID := streamHealthResponse.GetTabletAlias().GetUid()
realTimeStats := streamHealthResponse.GetRealtimeStats()
replicationLagSeconds := realTimeStats.GetReplicationLagSeconds()
assert.True(t, serving, "Tablet should be in serving state")
assert.True(t, UID > 0, "Tablet should contain uid")
// replicationLagSeconds varies till 7200 so setting safe limit
assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary")
if expectHealthy {
assert.True(t, serving, "Tablet should be in serving state")
// replicationLagSeconds varies till 7200 so setting safe limit
assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary")
} else {
assert.True(t, (!serving || replicationLagSeconds >= uint32(tabletUnhealthyThreshold.Seconds())), "Tablet should not be in serving and healthy state")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To help explain this assertion, as far as the healthcheck is concerned the tablet can be SERVING while unhealthy with regards to replica lag. For example:

vitess@fb5036c15bc9:/vt/local$ curl -s localhost:15101/debug/status_details
[
 {
  "Key": "Current State",
  "Class": "healthy",
  "Value": "REPLICA: Serving"
 },
 {
  "Key": "Replication Lag",
  "Class": "unhealthy",
  "Value": "25s"
 }
]

}
}

func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/repltracker/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func (p *poller) Status() (time.Duration, error) {
return 0, err
}

if !status.ReplicationRunning() {
// If replication is not currently running or we don't know what the lag is -- most commonly
// because the replica mysqld is in the process of trying to start replicating from its source
// but it hasn't yet reached the point where it can calculate the seconds_behind_master
// value and it's thus NULL -- then we will estimate the lag ourselves using the last seen
// value + the time elapsed since.
if !status.ReplicationRunning() || status.ReplicationLagUnknown {
if p.timeRecorded.IsZero() {
return 0, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "replication is not running")
}
Expand Down