Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass Semi sync information from durability policy and use it to log discrepancies #9533

Merged
merged 32 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ae9deed
test: add an e2e test verifying that semi sync is setup correctly aft…
GuptaManan100 Jan 13, 2022
7cab69e
feat: change PromoteReplica rpc to allow passing in semiSync as a par…
GuptaManan100 Jan 14, 2022
f18174b
feat: add semiSync as a parameter to fixSemiSync and use it log diffe…
GuptaManan100 Jan 14, 2022
5413fd5
feat: change parameter in fixSemiSync to be an enum
GuptaManan100 Jan 17, 2022
d93d77f
feat: change InitReplica rpc to allow passing in semiSync as a parameter
GuptaManan100 Jan 17, 2022
0ba29a5
feat: change InitPrimary rpc to allow passing in semiSync as a parameter
GuptaManan100 Jan 17, 2022
f87b857
feat: fix demotePrimary function to fix primary side semi-sync only i…
GuptaManan100 Jan 17, 2022
1377080
feat: change UndoDemotePrimary rpc to allow passing in semiSync as a …
GuptaManan100 Jan 17, 2022
8f045dd
feat: change UndoDemoteMaster rpc to allow passing in semiSync as a p…
GuptaManan100 Jan 17, 2022
1de77a3
feat: change SetReplicationSource rpc to allow passing in semiSync as…
GuptaManan100 Jan 17, 2022
036dbb5
feat: change SetMaster rpc to allow passing in semiSync as a parameter
GuptaManan100 Jan 17, 2022
7a83481
Merge remote-tracking branch 'upstream/main' into semi-sync-durabilit…
GuptaManan100 Jan 19, 2022
bad3a0c
feat: fix startReplication to not change semi-sync settings
GuptaManan100 Jan 19, 2022
1f83942
test: pass correct durability flag to vtctld in reparent tests
GuptaManan100 Jan 19, 2022
f6ea84a
feat: fix changeTypeLocked to allow passing semiSync as a parameter
GuptaManan100 Jan 22, 2022
82e5a47
feat: add durability flag to vtworker
GuptaManan100 Jan 22, 2022
8ac91e1
feat: fix StartReplication to allow passing semiSync as a parameter
GuptaManan100 Jan 22, 2022
2fe8592
Merge remote-tracking branch 'upstream/main' into semi-sync-durabilit…
GuptaManan100 Jan 22, 2022
a696c98
test: fix TestChangeTabletType to have the primary tablet setup in topo
GuptaManan100 Jan 22, 2022
c5e6136
test: fix TestStartReplication to have the primary tablet setup in topo
GuptaManan100 Jan 22, 2022
db477a1
test: add SettingDurabilityPolicies in init function for vtworker tests
GuptaManan100 Jan 22, 2022
243bfc3
test: SetDurabilityPolicies in endtoend tests for InitShardPrimary
GuptaManan100 Jan 22, 2022
9fb94ec
test: add semi-sync durability flag to all the e2e tests using semi-sync
GuptaManan100 Jan 22, 2022
8a75a7b
feat: In ChangeTabletType we should query the durability rules with t…
GuptaManan100 Jan 22, 2022
128a52c
test: only add the durability flag to vtctl if major version exceeds 13
GuptaManan100 Jan 22, 2022
ffbef3f
feat: fix demotePrimary to preserve behaviour
GuptaManan100 Jan 22, 2022
b19bdda
feat: do not log error messages when SemiSyncActionNone
GuptaManan100 Jan 22, 2022
ddcaa78
test: add test for checking logs in fixSemiSync
GuptaManan100 Jan 22, 2022
41acb37
refactor: rename ReplicaSemiSync to IsReplicaSemiSync
GuptaManan100 Jan 25, 2022
a5d5e17
refactor: rename SemiSyncActions values
GuptaManan100 Jan 25, 2022
411a983
refactor: rename durability_policy flag
GuptaManan100 Jan 25, 2022
79d2822
refactor: add comment explaining the use of isPrimarySideSemiSyncEnabled
GuptaManan100 Jan 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/cmd/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import (
)

var (
waitTime = flag.Duration("wait-time", 24*time.Hour, "time to wait on an action")
detachedMode = flag.Bool("detach", false, "detached mode - run vtcl detached from the terminal")
durability = flag.String("durability", "none", "type of durability to enforce. Default is none. Other values are dictated by registered plugins")
waitTime = flag.Duration("wait-time", 24*time.Hour, "time to wait on an action")
detachedMode = flag.Bool("detach", false, "detached mode - run vtcl detached from the terminal")
durabilityPolicy = flag.String("durability_policy", "none", "type of durability to enforce. Default is none. Other values are dictated by registered plugins")
)

func init() {
Expand Down Expand Up @@ -93,7 +93,7 @@ func main() {
log.Warningf("cannot connect to syslog: %v", err)
}

if err := reparentutil.SetDurabilityPolicy(*durability, nil); err != nil {
if err := reparentutil.SetDurabilityPolicy(*durabilityPolicy, nil); err != nil {
log.Errorf("error in setting durability policy: %v", err)
exit.Return(1)
}
Expand Down
7 changes: 7 additions & 0 deletions go/cmd/vtworker/vtworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
"vitess.io/vitess/go/vt/worker"
)

var (
cell = flag.String("cell", "", "cell to pick servers from")
commandDisplayInterval = flag.Duration("command_display_interval", time.Second, "Interval between each status update when vtworker is executing a single command from the command line")
username = flag.String("username", "", "If set, value is set as immediate caller id in the request and used by vttablet for TableACL check")
durabilityPolicy = flag.String("durability_policy", "none", "type of durability to enforce. Default is none. Other values are dictated by registered plugins")
)

func init() {
Expand Down Expand Up @@ -78,6 +80,11 @@ func main() {
os.Exit(0)
}

if err := reparentutil.SetDurabilityPolicy(*durabilityPolicy, nil); err != nil {
log.Errorf("error in setting durability policy: %v", err)
exit.Return(1)
}

ts := topo.Open()
defer ts.Close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestMainSetup(m *testing.M, useMysqlctld bool) {
localCluster = cluster.NewCluster(cell, hostname)
defer localCluster.Teardown()

localCluster.VtctldExtraArgs = append(localCluster.VtctldExtraArgs, "-durability_policy=semi_sync")
// Start topo server
err := localCluster.StartTopo()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/backup/vtbackup/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestMain(m *testing.M) {
localCluster = cluster.NewCluster(cell, hostname)
defer localCluster.Teardown()

localCluster.VtctldExtraArgs = append(localCluster.VtctldExtraArgs, "-durability_policy=semi_sync")
// Start topo server
err := localCluster.StartTopo()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var (
// LaunchCluster : starts the cluster as per given params.
func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
localCluster = cluster.NewCluster(cell, hostname)
localCluster.VtctldExtraArgs = append(localCluster.VtctldExtraArgs, "-durability_policy=semi_sync")

// Start topo server
err := localCluster.StartTopo()
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/recovery/pitr/shardedpitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func removeTablets(t *testing.T, tablets []*cluster.Vttablet) {

func initializeCluster(t *testing.T) {
clusterInstance = cluster.NewCluster(cell, hostname)
clusterInstance.VtctldExtraArgs = append(clusterInstance.VtctldExtraArgs, "-durability_policy=semi_sync")

// Start topo server
err := clusterInstance.StartTopo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func removeTablets(t *testing.T, tablets []*cluster.Vttablet) {

func initializeCluster(t *testing.T) {
clusterInstance = cluster.NewCluster(cell, hostname)
clusterInstance.VtctldExtraArgs = append(clusterInstance.VtctldExtraArgs, "-durability_policy=semi_sync")

// Start topo server
err := clusterInstance.StartTopo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func removeTablets(t *testing.T, tablets []*cluster.Vttablet) {
func initializeCluster(t *testing.T) (int, error) {

localCluster = cluster.NewCluster(cell, hostname)
localCluster.VtctldExtraArgs = append(localCluster.VtctldExtraArgs, "-durability_policy=semi_sync")

// Start topo server
err := localCluster.StartTopo()
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/recovery/unshardedrecovery/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestMainImpl(m *testing.M) {
localCluster = cluster.NewCluster(cell, hostname)
defer localCluster.Teardown()

localCluster.VtctldExtraArgs = append(localCluster.VtctldExtraArgs, "-durability_policy=semi_sync")
// Start topo server
err := localCluster.StartTopo()
if err != nil {
Expand Down
66 changes: 62 additions & 4 deletions go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func TestTrivialERS(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -55,7 +55,7 @@ func TestTrivialERS(t *testing.T) {

func TestReparentIgnoreReplicas(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
var err error
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestReparentIgnoreReplicas(t *testing.T) {

func TestReparentDownPrimary(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand Down Expand Up @@ -133,7 +133,7 @@ func TestReparentDownPrimary(t *testing.T) {

func TestReparentNoChoiceDownPrimary(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
var err error
Expand Down Expand Up @@ -165,3 +165,61 @@ func TestReparentNoChoiceDownPrimary(t *testing.T) {
// bring back the old primary as a replica, check that it catches up
utils.ResurrectTablet(ctx, t, clusterInstance, tablets[0])
}

func TestSemiSyncSetupCorrectly(t *testing.T) {
t.Run("semi-sync enabled", func(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
// Run forced reparent operation, this should proceed unimpeded.
out, err := utils.Ers(clusterInstance, tablets[1], "60s", "30s")
require.NoError(t, err, out)

utils.ConfirmReplication(t, tablets[1], []*cluster.Vttablet{tablets[0], tablets[2], tablets[3]})

for _, tablet := range tablets {
utils.CheckSemiSyncSetupCorrectly(t, tablet, "ON")
}

// Run forced reparent operation, this should proceed unimpeded.
out, err = utils.Prs(t, clusterInstance, tablets[0])
require.NoError(t, err, out)

utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

for _, tablet := range tablets {
utils.CheckSemiSyncSetupCorrectly(t, tablet, "ON")
}
})

t.Run("semi-sync disabled", func(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t, false)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
// Run forced reparent operation, this should proceed unimpeded.
out, err := utils.Ers(clusterInstance, tablets[1], "60s", "30s")
require.NoError(t, err, out)

utils.ConfirmReplication(t, tablets[1], []*cluster.Vttablet{tablets[0], tablets[2], tablets[3]})

for _, tablet := range tablets {
utils.CheckSemiSyncSetupCorrectly(t, tablet, "OFF")
}

// Run forced reparent operation, this should proceed unimpeded.
out, err = utils.Prs(t, clusterInstance, tablets[0])
require.NoError(t, err, out)

utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

for _, tablet := range tablets {
utils.CheckSemiSyncSetupCorrectly(t, tablet, "OFF")
}
})
}
10 changes: 6 additions & 4 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestPRSForInitialization(t *testing.T) {
var tablets []*cluster.Vttablet
clusterInstance := cluster.NewCluster("zone1", "localhost")
keyspace := &cluster.Keyspace{Name: utils.KeyspaceName}
clusterInstance.VtctldExtraArgs = append(clusterInstance.VtctldExtraArgs, "-durability_policy=semi_sync")
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestPRSForInitialization(t *testing.T) {
// TestERSPromoteRdonly tests that we never end up promoting a rdonly instance as the primary
func TestERSPromoteRdonly(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
var err error
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestERSPromoteRdonly(t *testing.T) {
// TestERSPreventCrossCellPromotion tests that we promote a replica in the same cell as the previous primary if prevent cross cell promotion flag is set
func TestERSPreventCrossCellPromotion(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
var err error
Expand All @@ -158,7 +159,7 @@ func TestERSPreventCrossCellPromotion(t *testing.T) {
// caught up to it by pulling transactions from it
func TestPullFromRdonly(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
var err error
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestPullFromRdonly(t *testing.T) {
// is stopped on the primary elect.
func TestNoReplicationStatusAndReplicationStopped(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})
Expand Down Expand Up @@ -253,6 +254,7 @@ func TestERSForInitialization(t *testing.T) {
var tablets []*cluster.Vttablet
clusterInstance := cluster.NewCluster("zone1", "localhost")
keyspace := &cluster.Keyspace{Name: utils.KeyspaceName}
clusterInstance.VtctldExtraArgs = append(clusterInstance.VtctldExtraArgs, "-durability_policy=semi_sync")
// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)
Expand Down
22 changes: 11 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestPrimaryToSpareStateChangeImpossible(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -45,7 +45,7 @@ func TestPrimaryToSpareStateChangeImpossible(t *testing.T) {

func TestReparentCrossCell(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -59,7 +59,7 @@ func TestReparentCrossCell(t *testing.T) {

func TestReparentGraceful(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -84,7 +84,7 @@ func TestReparentGraceful(t *testing.T) {
// TestPRSWithDrainedLaggingTablet tests that PRS succeeds even if we have a lagging drained tablet
func TestPRSWithDrainedLaggingTablet(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -111,7 +111,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {

func TestReparentReplicaOffline(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand All @@ -128,7 +128,7 @@ func TestReparentReplicaOffline(t *testing.T) {

func TestReparentAvoid(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.DeleteTablet(t, clusterInstance, tablets[2])
Expand Down Expand Up @@ -160,14 +160,14 @@ func TestReparentAvoid(t *testing.T) {

func TestReparentFromOutside(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
reparentFromOutside(t, clusterInstance, false)
}

func TestReparentFromOutsideWithNoPrimary(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand Down Expand Up @@ -256,7 +256,7 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus

func TestReparentWithDownReplica(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand Down Expand Up @@ -299,7 +299,7 @@ func TestReparentWithDownReplica(t *testing.T) {

func TestChangeTypeSemiSync(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand Down Expand Up @@ -363,7 +363,7 @@ func TestChangeTypeSemiSync(t *testing.T) {

func TestReparentDoesntHangIfPrimaryFails(t *testing.T) {
defer cluster.PanicHandler(t)
clusterInstance := utils.SetupReparentCluster(t)
clusterInstance := utils.SetupReparentCluster(t, true)
defer utils.TeardownCluster(clusterInstance)
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets

Expand Down
Loading