Skip to content

Commit

Permalink
Backwards compatible replication status to state transition (#10167)
Browse files Browse the repository at this point in the history
* This effectively reverts 65226ad

It was NOT backwards compatible.

Signed-off-by: Matt Lord <mattalord@gmail.com>

* This adds the new state fields back so that we can switch to them in v15

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Allow for safe/smooth upgrades within 14.0-SNAPSHOT

We are only appending the last io_thread_connecting field

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Avoid intermediate io_thread_connecting protobuf field

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Continue using replication states in v14 tests

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Translate SQL running status to state as well

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Use backward compat ReplicaWasRunning check

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add backward compat SQLThreadWasRunning function

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add comment about when backward compat can be removed

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Support older clients with new tablets

Signed-off-by: Matt Lord <mattalord@gmail.com>

* feat: remove SQLThreadWasRunning unused function

Signed-off-by: Manan Gupta <manan@planetscale.com>

* test: add an upgrade test to verify the replicationstatus is backward compatible

Signed-off-by: Manan Gupta <manan@planetscale.com>

* feat: fix ReplicaWasRunning so that it doesn't have code dependent on the upgrade

Signed-off-by: Manan Gupta <manan@planetscale.com>

* refactor: rename function to reflect the output type

Signed-off-by: Manan Gupta <manan@planetscale.com>

Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
mattlord and GuptaManan100 authored Apr 29, 2022
1 parent 8a5950d commit fbf574c
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 63 deletions.
43 changes: 41 additions & 2 deletions go/mysql/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *ReplicationStatus) SQLHealthy() bool {

// ReplicationStatusToProto translates a Status to proto3.
func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
return &replicationdatapb.Status{
replstatuspb := &replicationdatapb.Status{
Position: EncodePosition(s.Position),
RelayLogPosition: EncodePosition(s.RelayLogPosition),
FilePosition: EncodePosition(s.FilePosition),
Expand All @@ -91,6 +91,21 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
SqlState: int32(s.SQLState),
LastSqlError: s.LastSQLError,
}

// We need to be able to send gRPC response messages from v14 and newer tablets to
// v13 and older clients. The older clients will not be processing the IoState or
// SqlState values in the message but instead looking at the IoThreadRunning and
// SqlThreadRunning booleans so we need to map and share this dual state.
// Note: v13 and older clients considered the IO thread state of connecting to
// be equal to running. That is why we do so here when mapping the states.
// This backwards compatibility can be removed in v15+.
if s.IOState == ReplicationStateRunning || s.IOState == ReplicationStateConnecting {
replstatuspb.IoThreadRunning = true
}
if s.SQLState == ReplicationStateRunning {
replstatuspb.SqlThreadRunning = true
}
return replstatuspb
}

// ProtoToReplicationStatus translates a proto Status, or panics.
Expand Down Expand Up @@ -118,7 +133,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
panic(vterrors.Wrapf(err, "cannot decode SourceUUID"))
}
}
return ReplicationStatus{
replstatus := ReplicationStatus{
Position: pos,
RelayLogPosition: relayPos,
FilePosition: filePos,
Expand All @@ -134,6 +149,30 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
SQLState: ReplicationState(s.SqlState),
LastSQLError: s.LastSqlError,
}

// We need to be able to process gRPC response messages from v13 and older tablets.
// In those cases there will be no value (unknown) for the IoState or SqlState but
// the message will have the IoThreadRunning and SqlThreadRunning booleans and we
// need to revert to our assumptions about a binary state as that's all the older
// tablet can provide (really only applicable to the IO status as that is NOT binary
// but rather has three states: Running, Stopped, Connecting).
// This backwards compatibility can be removed in v15+.
if replstatus.IOState == ReplicationStateUnknown {
if s.IoThreadRunning {
replstatus.IOState = ReplicationStateRunning
} else {
replstatus.IOState = ReplicationStateStopped
}
}
if replstatus.SQLState == ReplicationStateUnknown {
if s.SqlThreadRunning {
replstatus.SQLState = ReplicationStateRunning
} else {
replstatus.SQLState = ReplicationStateStopped
}
}

return replstatus
}

// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
Expand Down
11 changes: 11 additions & 0 deletions go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +88,15 @@ func GetPrimaryPosition(t *testing.T, vttablet Vttablet, hostname string) (strin
return pos, gtID
}

// GetReplicationStatus gets the replication status of given vttablet
func GetReplicationStatus(t *testing.T, vttablet *Vttablet, hostname string) *replicationdatapb.Status {
ctx := context.Background()
vtablet := getTablet(vttablet.GrpcPort, hostname)
pos, err := tmClient.ReplicationStatus(ctx, vtablet)
require.NoError(t, err)
return pos
}

// VerifyRowsInTabletForTable verifies the total number of rows in a table.
// This is used to check that replication has caught up with the changes on primary.
func VerifyRowsInTabletForTable(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int, tableName string) {
Expand Down
41 changes: 41 additions & 0 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,44 @@ func TestReparentDoesntHangIfPrimaryFails(t *testing.T) {
require.Error(t, err)
assert.Contains(t, out, "primary failed to PopulateReparentJournal")
}

func TestReplicationStatus(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// Stop the SQL_THREAD on tablets[1]
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE SQL_THREAD;`)
require.NoError(t, err)
// Check the replication status on tablets[1] and assert that the IO thread is read to be running and SQL thread is stopped
replicationStatus := cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread := utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.False(t, sqlThread)

// Stop replication on tablets[1] and verify that both the threads are reported as not running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.False(t, sqlThread)

// Start replication on tablets[1] and verify that both the threads are reported as running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `START SLAVE;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.True(t, ioThread)
require.True(t, sqlThread)

// Stop IO_THREAD on tablets[1] and verify that the IO thread is read to be stopped and SQL thread is running
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
replicationStatus = cluster.GetReplicationStatus(t, tablets[1], utils.Hostname)
ioThread, sqlThread = utils.ReplicationThreadsStatus(t, replicationStatus, clusterInstance.VtctlMajorVersion, clusterInstance.VtTabletMajorVersion)
require.False(t, ioThread)
require.True(t, sqlThread)
}
36 changes: 36 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -744,3 +745,38 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V
require.Equal(t, "No", res.Rows[0][11].ToString())
}
}

// ReplicationThreadsStatus returns the status of the IO and SQL thread. It reads the result of the replication status
// based on the vtctl major version provided. It also uses the vttabletVersion to assert on the expectation of the new fields
// being unknown for the old vttablets and that they match for the new vttablets
func ReplicationThreadsStatus(t *testing.T, status *replicationdatapb.Status, vtctlVersion, vttabletVersion int) (bool, bool) {
if vttabletVersion == 13 {
// If vttablet is version 13, then the new fields should be unknown
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.Equal(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
} else {
// For the new vttablet, the new parameters should not be unknown. Moreover, the old parameters should also be provided
// and should agree with the new ones
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.IoState))
require.NotEqual(t, mysql.ReplicationStateUnknown, mysql.ReplicationState(status.SqlState))
require.Equal(t, status.IoThreadRunning, mysql.ReplicationState(status.IoState) == mysql.ReplicationStateRunning)
require.Equal(t, status.SqlThreadRunning, mysql.ReplicationState(status.SqlState) == mysql.ReplicationStateRunning)
}

// if vtctlVersion provided is 13, then we should read the old parameters, since that is what old vtctl would do
if vtctlVersion == 13 {
return status.IoThreadRunning, status.SqlThreadRunning
}
// If we are at the latest vtctl version, we should read the latest parameters if provided otherwise the old ones
ioState := mysql.ReplicationState(status.IoState)
ioThread := status.IoThreadRunning
if ioState != mysql.ReplicationStateUnknown {
ioThread = ioState == mysql.ReplicationStateRunning
}
sqlState := mysql.ReplicationState(status.SqlState)
sqlThread := status.SqlThreadRunning
if sqlState != mysql.ReplicationStateUnknown {
sqlThread = sqlState == mysql.ReplicationStateRunning
}
return ioThread, sqlThread
}
Loading

0 comments on commit fbf574c

Please sign in to comment.