Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the race between PromoteReplica and replication manager tick #9859

Merged
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type FakeMysqlDaemon struct {
// StartReplicationError is used by StartReplication
StartReplicationError error

// PromoteLag is the time for which Promote will stall
PromoteLag time.Duration

// PrimaryStatusError is used by PrimaryStatus
PrimaryStatusError error

Expand Down Expand Up @@ -424,6 +427,9 @@ func (fmd *FakeMysqlDaemon) WaitSourcePos(_ context.Context, pos mysql.Position)

// Promote is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Position, error) {
if fmd.PromoteLag > 0 {
time.Sleep(fmd.PromoteLag)
}
if fmd.PromoteError != nil {
return mysql.Position{}, fmd.PromoteError
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtctl/reparentutil/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ func ReplicaWasRunning(stopStatus *replicationdatapb.StopReplicationStatus) (boo
// tabletmanager RPC.
//
// It does not start the replication forcefully.
// If we are unable to find the shard primary of the tablet from the topo server
// we exit out without any error.
func SetReplicationSource(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, tablet *topodatapb.Tablet) error {
shardPrimary, err := topotools.GetShardPrimaryForTablet(ctx, ts, tablet)
if err != nil {
// If we didn't find the shard primary, we return without any error
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/replmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func newReplManager(ctx context.Context, tm *TabletManager, interval time.Durati
}
}

// SetTabletType starts/stops the replication manager ticks based on the tablet type provided.
// It stops the ticks if the tablet type is not a replica type, starts the ticks otherwise.
func (rm *replManager) SetTabletType(tabletType topodatapb.TabletType) {
if *mysqlctl.DisableActiveReparents {
return
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,17 @@ func (tm *TabletManager) PromoteReplica(ctx context.Context, semiSync bool) (str
}
defer tm.unlock()

// SetTabletType only stops the replication manager ticks since we are going to promote this tablet
// to primary. We should do this before making any changes to MySQL, otherwise any replication manager
// tick would change the replication source, etc settings back.
tm.replManager.SetTabletType(topodatapb.TabletType_PRIMARY)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
// If PromoteReplica was successful, then this would be a no-op, since we have already stopped
// the replication manager ticks. But, in case we are unsuccessful, we must restart the ticks
// so, we call SetTabletType as a deferred call.
tm.replManager.SetTabletType(tm.Tablet().Type)
}()

// If Orchestrator is configured then also tell it we're promoting a tablet so it needs to be in maintenance mode
// Do this in the background, as it's best-effort.
go func() {
Expand Down
42 changes: 39 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,59 @@ package tabletmanager

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/topo/memorytopo"
)

// TestPromoteReplicaHealthTicksStopped checks that the health ticks are not running on the
// replication manager after running PromoteReplica
func TestPromoteReplicaHealthTicksStopped(t *testing.T) {
// TestPromoteReplicaReplicationManagerSuccess checks that the replication manager is not running after running PromoteReplica
// We also assert that replication manager is stopped before we make any changes to MySQL.
func TestPromoteReplicaReplicationManagerSuccess(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1")
statsTabletTypeCount.ResetAll()
tm := newTestTM(t, ts, 100, keyspace, shard)
defer tm.Stop()

// Stop the replication manager and set the interval to 100 milliseconds
tm.replManager.ticks.Stop()
tm.replManager.ticks.SetInterval(100 * time.Millisecond)
// Change the ticks function of the replication manager so that we can keep the count of how many times it is called
numTicksRan := 0
tm.replManager.ticks.Start(func() {
numTicksRan++
})
// Set the promotion lag to a second and then run PromoteReplica
tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteLag = time.Second
_, err := tm.PromoteReplica(ctx, false)
require.NoError(t, err)
// At the end we expect the replication manager to be stopped.
require.False(t, tm.replManager.ticks.Running())
// We want the replication manager to be stopped before we call Promote on the MySQL instance.
// Since that call will take over a second to complete, if the replication manager is running, then it will have ticked
// 9 to 10 times. If we had stopped it before, then the replication manager would have only ticked 1-2 times.
// So we assert that the numTicksRan is less than 5 to check that the replication manager was closed before we call Promote.
require.Less(t, numTicksRan, 5)
}

// TestPromoteReplicaReplicationManagerFailure checks that the replication manager is running after running PromoteReplica fails.
func TestPromoteReplicaReplicationManagerFailure(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1")
statsTabletTypeCount.ResetAll()
tm := newTestTM(t, ts, 100, keyspace, shard)
defer tm.Stop()

require.True(t, tm.replManager.ticks.Running())
// Set the promotion lag to a second and then run PromoteReplica
tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon).PromoteError = fmt.Errorf("promote error")
_, err := tm.PromoteReplica(ctx, false)
require.Error(t, err)
// At the end we expect the replication manager to be stopped.
require.True(t, tm.replManager.ticks.Running())
}
3 changes: 2 additions & 1 deletion go/vt/wrangler/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (wr *Wrangler) StartReplication(ctx context.Context, tablet *topodatapb.Tab

// SetReplicationSource is used to set the replication source on the specified tablet to the current shard primary (if available).
// It also figures out if the tablet should be sending semi-sync ACKs or not and passes that to the tabletmanager RPC.
// It does not start the replication forcefully
// It does not start the replication forcefully. If we are unable to find the shard primary of the tablet from the topo server
// we exit out without any error.
func (wr *Wrangler) SetReplicationSource(ctx context.Context, tablet *topodatapb.Tablet) error {
return reparentutil.SetReplicationSource(ctx, wr.ts, wr.TabletManagerClient(), tablet)
}
Expand Down