Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backwards compatible replication status to state transition #10167

Merged
merged 14 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *ReplicationStatus) SQLHealthy() bool {

// ReplicationStatusToProto translates a Status to proto3.
func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
return &replicationdatapb.Status{
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
Expand All @@ -91,6 +91,21 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
SqlState: int32(s.SQLState),
LastSqlError: s.LastSQLError,
}

// We need to be able to send gRPC response messages from v14 and newer tablets to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// v13 and older clients. The older clients will not be processing the IoState or
// SqlState values in the message but instead looking at the IoThreadRunning and
// SqlThreadRunning booleans so we need to map and share this dual state.
// Note: v13 and older clients considered the IO thread state of connecting to
// be equal to running. That is why we do so here when mapping the states.
// This backwards compatibility can be removed in v15+.
if s.IOState == ReplicationStateRunning || s.IOState == ReplicationStateConnecting {
replstatuspb.IoThreadRunning = true
}
if s.SQLState == ReplicationStateRunning {
replstatuspb.SqlThreadRunning = true
}
return replstatuspb
}

// ProtoToReplicationStatus translates a proto Status, or panics.
Expand Down Expand Up @@ -118,7 +133,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
panic(vterrors.Wrapf(err, "cannot decode SourceUUID"))
}
}
return ReplicationStatus{
replstatus := ReplicationStatus{
Position: pos,
RelayLogPosition: relayPos,
FilePosition: filePos,
Expand All @@ -134,6 +149,30 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
SQLState: ReplicationState(s.SqlState),
LastSQLError: s.LastSqlError,
}

// We need to be able to process gRPC response messages from v13 and older tablets.
// In those cases there will be no value (unknown) for the IoState or SqlState but
// the message will have the IoThreadRunning and SqlThreadRunning booleans and we
// need to revert to our assumptions about a binary state as that's all the older
// tablet can provide (really only applicable to the IO status as that is NOT binary
// but rather has three states: Running, Stopped, Connecting).
// This backwards compatibility can be removed in v15+.
if replstatus.IOState == ReplicationStateUnknown {
if s.IoThreadRunning {
replstatus.IOState = ReplicationStateRunning
} else {
replstatus.IOState = ReplicationStateStopped
}
}
if replstatus.SQLState == ReplicationStateUnknown {
if s.SqlThreadRunning {
replstatus.SQLState = ReplicationStateRunning
} else {
replstatus.SQLState = ReplicationStateStopped
}
}

return replstatus
}

// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +88,15 @@ func GetPrimaryPosition(t *testing.T, vttablet Vttablet, hostname string) (strin
return pos, gtID
}

// GetReplicationStatus gets the replication status of given vttablet
func GetReplicationStatus(t *testing.T, vttablet *Vttablet, hostname string) *replicationdatapb.Status {
ctx := context.Background()
vtablet := getTablet(vttablet.GrpcPort, hostname)
pos, err := tmClient.ReplicationStatus(ctx, vtablet)
require.NoError(t, err)
return pos
}

// VerifyRowsInTabletForTable verifies the total number of rows in a table.
// This is used to check that replication has caught up with the changes on primary.
func VerifyRowsInTabletForTable(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int, tableName string) {
Expand Down
41 changes: 41 additions & 0 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,44 @@ func TestReparentDoesntHangIfPrimaryFails(t *testing.T) {
require.Error(t, err)
assert.Contains(t, out, "primary failed to PopulateReparentJournal")
}

func TestReplicationStatus(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Stop the SQL_THREAD on tablets[1]
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`)
require.NoError(t, err)
// Check the replication status on tablets[1] and assert that the IO thread is read to be running and SQL thread is stopped
replicationStatus := cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread := utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.False(t, sqlThread)

// Stop replication on tablets[1] and verify that both the threads are reported as not running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.False(t, sqlThread)

// Start replication on tablets[1] and verify that both the threads are reported as running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `START SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.True(t, sqlThread)

// Stop IO_THREAD on tablets[1] and verify that the IO thread is read to be stopped and SQL thread is running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.True(t, sqlThread)
}
36 changes: 36 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -744,3 +745,38 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
require.Equal(t, "No", res.Rows[0][11].ToString())
}
}

// ReplicationThreadsStatus returns the status of the IO and SQL thread. It reads the result of the replication status
// based on the vtctl major version provided. It also uses the vttabletVersion to assert on the expectation of the new fields
// being unknown for the old vttablets and that they match for the new vttablets
func ReplicationThreadsStatus(t *testing.T, status *replicationdatapb.Status, vtctlVersion, vttabletVersion int) (bool, bool) {
if vttabletVersion == 13 {
// If vttablet is version 13, then the new fields should be unknown
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
} else {
// For the new vttablet, the new parameters should not be unknown. Moreover, the old parameters should also be provided
// and should agree with the new ones
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
require.Equal(t, status.IoThreadRunning, mysql.ReplicationState(status.IoState) == mysql.ReplicationStateRunning)
require.Equal(t, status.SqlThreadRunning, mysql.ReplicationState(status.SqlState) == mysql.ReplicationStateRunning)
}

// if vtctlVersion provided is 13, then we should read the old parameters, since that is what old vtctl would do
if vtctlVersion == 13 {
return status.IoThreadRunning, status.SqlThreadRunning
}
// If we are at the latest vtctl version, we should read the latest parameters if provided otherwise the old ones
ioState := mysql.ReplicationState(status.IoState)
ioThread := status.IoThreadRunning
if ioState != mysql.ReplicationStateUnknown {
ioThread = ioState == mysql.ReplicationStateRunning
}
sqlState := mysql.ReplicationState(status.SqlState)
sqlThread := status.SqlThreadRunning
if sqlState != mysql.ReplicationStateUnknown {
sqlThread = sqlState == mysql.ReplicationStateRunning
}
return ioThread, sqlThread
}
Loading