From 2a412d1d6246d3511210604cce7541e23022899f Mon Sep 17 00:00:00 2001 From: PlanetScale Actions Bot <60239337+planetscale-actions-bot@users.noreply.github.com> Date: Fri, 9 Feb 2024 22:47:41 -0800 Subject: [PATCH] cherry pick of 15184 (#4353) --- .golangci.yml | 1 + go/vt/mysqlctl/mysqlctlproto/backup_test.go | 2 - go/vt/sqlparser/comments_test.go | 42 +- .../max_replication_lag_module_test.go | 444 +++++------------- 4 files changed, 130 insertions(+), 359 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index f240888d609..4a071139ba6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,6 +13,7 @@ linters-settings: disable: # not supported when using Generics in 1.18 - nilness - unusedwrite + - loopclosure # fixed in go1.22 linters: disable-all: true diff --git a/go/vt/mysqlctl/mysqlctlproto/backup_test.go b/go/vt/mysqlctl/mysqlctlproto/backup_test.go index a5e31295a0e..cccbb349aa2 100644 --- a/go/vt/mysqlctl/mysqlctlproto/backup_test.go +++ b/go/vt/mysqlctl/mysqlctlproto/backup_test.go @@ -102,8 +102,6 @@ func TestBackupHandleToProto(t *testing.T) { } for _, tt := range tests { - tt := tt - t.Run(tt.bh.testname(), func(t *testing.T) { t.Parallel() diff --git a/go/vt/sqlparser/comments_test.go b/go/vt/sqlparser/comments_test.go index dd22fd7000c..42d02e35652 100644 --- a/go/vt/sqlparser/comments_test.go +++ b/go/vt/sqlparser/comments_test.go @@ -18,15 +18,14 @@ package sqlparser import ( "fmt" - "reflect" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/vt/sysvars" querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/sysvars" ) func TestSplitComments(t *testing.T) { @@ -143,15 +142,9 @@ func TestSplitComments(t *testing.T) { gotSQL, gotComments := SplitMarginComments(testCase.input) gotLeadingComments, gotTrailingComments := gotComments.Leading, gotComments.Trailing - if gotSQL != testCase.outSQL { - t.Errorf("test input: '%s', got SQL\n%+v, want\n%+v", testCase.input, gotSQL, testCase.outSQL) - } - if gotLeadingComments != testCase.outLeadingComments { - t.Errorf("test input: '%s', got LeadingComments\n%+v, want\n%+v", testCase.input, gotLeadingComments, testCase.outLeadingComments) - } - if gotTrailingComments != testCase.outTrailingComments { - t.Errorf("test input: '%s', got TrailingComments\n%+v, want\n%+v", testCase.input, gotTrailingComments, testCase.outTrailingComments) - } + assert.Equal(t, testCase.outSQL, gotSQL, "SQL mismatch") + assert.Equal(t, testCase.outLeadingComments, gotLeadingComments, "LeadingComments mismatch") + assert.Equal(t, testCase.outTrailingComments, gotTrailingComments, "TrailingCommints mismatch") }) } } @@ -225,10 +218,7 @@ a`, }} for _, testCase := range testCases { gotSQL := StripLeadingComments(testCase.input) - - if gotSQL != testCase.outSQL { - t.Errorf("test input: '%s', got SQL\n%+v, want\n%+v", testCase.input, gotSQL, testCase.outSQL) - } + assert.Equal(t, testCase.outSQL, gotSQL) } } @@ -254,10 +244,8 @@ func TestExtractMysqlComment(t *testing.T) { }} for _, testCase := range testCases { gotVersion, gotSQL := ExtractMysqlComment(testCase.input) + assert.Equal(t, testCase.outVersion, gotVersion, "version mismatch") - if gotVersion != testCase.outVersion { - t.Errorf("test input: '%s', got version\n%+v, want\n%+v", testCase.input, gotVersion, testCase.outVersion) - } if gotSQL != testCase.outSQL { t.Errorf("test input: '%s', got SQL\n%+v, want\n%+v", testCase.input, gotSQL, testCase.outSQL) } @@ -369,9 +357,8 @@ func TestExtractCommentDirectives(t *testing.T) { require.Nil(t, vals) return } - if !reflect.DeepEqual(vals.m, testCase.vals) { - t.Errorf("test input: '%v', got vals %T:\n%+v, want %T\n%+v", testCase.input, vals, vals, testCase.vals, testCase.vals) - } + + assert.Equal(t, testCase.vals, vals.m) }) } }) @@ -543,17 +530,16 @@ func TestGetPriorityFromStatement(t *testing.T) { parser := NewTestParser() for _, testCase := range testCases { - theThestCase := testCase - t.Run(theThestCase.query, func(t *testing.T) { + t.Run(testCase.query, func(t *testing.T) { t.Parallel() - stmt, err := parser.Parse(theThestCase.query) + stmt, err := parser.Parse(testCase.query) assert.NoError(t, err) actualPriority, actualError := GetPriorityFromStatement(stmt) - if theThestCase.expectedError != nil { - assert.ErrorIs(t, actualError, theThestCase.expectedError) + if testCase.expectedError != nil { + assert.ErrorIs(t, actualError, testCase.expectedError) } else { assert.NoError(t, err) - assert.Equal(t, theThestCase.expectedPriority, actualPriority) + assert.Equal(t, testCase.expectedPriority, actualPriority) } }) } diff --git a/go/vt/throttler/max_replication_lag_module_test.go b/go/vt/throttler/max_replication_lag_module_test.go index 6379b067412..77be6501e4c 100644 --- a/go/vt/throttler/max_replication_lag_module_test.go +++ b/go/vt/throttler/max_replication_lag_module_test.go @@ -23,10 +23,10 @@ import ( "time" "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/vt/log" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -51,32 +51,33 @@ const ( ) type testFixture struct { + tb testing.TB m *MaxReplicationLagModule ratesHistory *fakeRatesHistory } -func newTestFixtureWithMaxReplicationLag(maxReplicationLag int64) (*testFixture, error) { +func newTestFixtureWithMaxReplicationLag(tb testing.TB, maxReplicationLag int64) *testFixture { config := NewMaxReplicationLagModuleConfig(maxReplicationLag) - return newTestFixture(config) + return newTestFixture(tb, config) } -func newTestFixture(config MaxReplicationLagModuleConfig) (*testFixture, error) { +func newTestFixture(tb testing.TB, config MaxReplicationLagModuleConfig) *testFixture { ratesHistory := newFakeRatesHistory() fc := &fakeClock{} // Do not start at 0*time.Second because than the code cannot distinguish // between a legimate value and a zero time.Time value. fc.setNow(1 * time.Second) m, err := NewMaxReplicationLagModule(config, ratesHistory.aggregatedIntervalHistory, fc.now) - if err != nil { - return nil, err - } + require.NoError(tb, err) + // Updates for the throttler go into a big channel and will be ignored. m.rateUpdateChan = make(chan<- struct{}, 1000) return &testFixture{ + tb: tb, m: m, ratesHistory: ratesHistory, - }, nil + } } // process does the same thing as MaxReplicationLagModule.ProcessRecords() does @@ -91,17 +92,10 @@ func (tf *testFixture) recalculateRate(lagRecord replicationLagRecord) { tf.m.recalculateRate(lagRecord) } -func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.Time) error { - if got, want := tf.m.currentState, state; got != want { - return fmt.Errorf("module in wrong state. got = %v, want = %v", got, want) - } - if got, want := tf.m.MaxRate(), rate; got != want { - return fmt.Errorf("module has wrong MaxRate(). got = %v, want = %v", got, want) - } - if got, want := tf.m.lastRateChange, lastRateChange; got != want { - return fmt.Errorf("module has wrong lastRateChange time. got = %v, want = %v", got, want) - } - return nil +func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.Time) { + require.Equal(tf.tb, state, tf.m.currentState, "module in wrong state") + require.Equal(tf.tb, rate, tf.m.MaxRate(), "module has wrong MaxRate()") + require.Equal(tf.tb, lastRateChange, tf.m.lastRateChange, "module has wrong lastRateChange time") } func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) { @@ -128,14 +122,11 @@ func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) { }, } - for _, aTestCase := range testCases { - theCase := aTestCase - + for _, theCase := range testCases { t.Run(theCase.name, func(t *testing.T) { t.Parallel() - fixture, err := newTestFixtureWithMaxReplicationLag(5) - assert.NoError(t, err) + fixture := newTestFixtureWithMaxReplicationLag(t, 5) if theCase.expectPanic { assert.Panics(t, func() { fixture.recalculateRate(theCase.lagRecord) }) @@ -146,15 +137,9 @@ func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) { } func TestMaxReplicationLagModule_RateNotZeroWhenDisabled(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(ReplicationLagModuleDisabled) - if err != nil { - t.Fatal(err) - } - + tf := newTestFixtureWithMaxReplicationLag(t, ReplicationLagModuleDisabled) // Initial rate must not be zero. It's ReplicationLagModuleDisabled instead. - if err := tf.checkState(stateIncreaseRate, ReplicationLagModuleDisabled, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, ReplicationLagModuleDisabled, sinceZero(1*time.Second)) } func TestMaxReplicationLagModule_InitialStateAndWait(t *testing.T) { @@ -162,34 +147,24 @@ func TestMaxReplicationLagModule_InitialStateAndWait(t *testing.T) { // Overwrite the default config to make sure we test a non-default value. config.InitialRate = 123 config.MaxDurationBetweenIncreasesSec = 23 - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // Initial rate must be config.InitialRate. - if err := tf.checkState(stateIncreaseRate, config.InitialRate, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, config.InitialRate, sinceZero(1*time.Second)) + // After startup, the next increment won't happen until // config.MaxDurationBetweenIncreasesSec elapsed. - if got, want := tf.m.nextAllowedChangeAfterInit, sinceZero(config.MaxDurationBetweenIncreases()+1*time.Second); got != want { - t.Fatalf("got = %v, want = %v", got, want) - } + require.Equal(t, sinceZero(config.MaxDurationBetweenIncreases()+1*time.Second), tf.m.nextAllowedChangeAfterInit) } // TestMaxReplicationLagModule_Increase tests only the continuous increase of the // rate and assumes that we are well below the replica capacity. func TestMaxReplicationLagModule_Increase(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // We start at config.InitialRate. - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) + // After the initial wait period of 62s // (config.MaxDurationBetweenIncreasesSec), regular increments start. @@ -200,31 +175,25 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) { tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. // r2 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) + // We have to wait at least config.MinDurationBetweenIncreasesSec (40s) before // the next increase. - if got, want := tf.m.replicaUnderTest.nextAllowedChange, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenIncreases()); got != want { - t.Fatalf("got = %v, want = %v", got, want) - } + require.Equal(t, sinceZero(70*time.Second).Add(tf.m.config.MinDurationBetweenIncreases()), tf.m.replicaUnderTest.nextAllowedChange) + // r2 @ 75s, 0s lag tf.ratesHistory.add(sinceZero(70*time.Second), 100) tf.ratesHistory.add(sinceZero(74*time.Second), 200) tf.process(lagRecord(sinceZero(75*time.Second), r2, 0)) // Lag record was ignored because it's within the wait period. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 80s, 0s lag tf.ratesHistory.add(sinceZero(79*time.Second), 200) tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) // The r1 lag update was ignored because an increment "under test" is always // locked in with the replica which triggered the increase (r2 this time). - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // No increase is possible for the next 20 seconds. @@ -232,25 +201,19 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) { tf.ratesHistory.add(sinceZero(80*time.Second), 200) tf.ratesHistory.add(sinceZero(89*time.Second), 200) tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 100s, 0s lag tf.ratesHistory.add(sinceZero(99*time.Second), 200) tf.process(lagRecord(sinceZero(100*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // Next rate increase is possible after testing the rate for 40s. // r2 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(109*time.Second), 200) tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)) } // TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp is @@ -258,19 +221,14 @@ func TestMaxReplicationLagModule_Increase(t *testing.T) { // test that the system makes progress if the currently tracked replica has // LastError set or is no longer tracked. func TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. // r2 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 @ 75s, 0s lag, LastError set rError := lagRecord(sinceZero(75*time.Second), r2, 0) @@ -284,9 +242,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp(t *testing.T) // We ignore r2 as "replica under test" because it has LastError set. // Instead, we act on r1. // r1 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)) // We'll simulate a shutdown of r1 i.e. we're no longer tracking it. // r1 @ 115s, 0s lag, !Up @@ -302,36 +258,27 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp(t *testing.T) // We ignore r1 as "replica under test" because it has !Up set. // Instead, we act on r2. // r2 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 800, sinceZero(150*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 800, sinceZero(150*time.Second)) } // TestMaxReplicationLagModule_ReplicaUnderTest_Timeout tests the safe guard // that a "replica under test" which didn't report its lag for a while will be // cleared such that any other replica can become the new "replica under test". func TestMaxReplicationLagModule_ReplicaUnderTest_Timeout(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. // r2 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 80s, 0s lag (ignored because r2 is the "replica under test") tf.ratesHistory.add(sinceZero(70*time.Second), 100) tf.ratesHistory.add(sinceZero(79*time.Second), 200) tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 as "replica under test" did not report its lag for too long. // We'll ignore it from now and let other replicas trigger rate changes. @@ -340,28 +287,21 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_Timeout(t *testing.T) { // (last rate change + test duration + max duration between increases). tf.ratesHistory.add(sinceZero(172*time.Second), 200) tf.process(lagRecord(sinceZero(173*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(173*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 400, sinceZero(173*time.Second)) } // TestMaxReplicationLagModule_ReplicaUnderTest_IncreaseToDecrease verifies that // the current "replica under test" is ignored when our state changes from // "stateIncreaseRate" to "stateDecreaseAndGuessRate". func TestMaxReplicationLagModule_ReplicaUnderTest_IncreaseToDecrease(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // r2 @ 70s, 0s lag (triggers the increase state) tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. // r2 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 80s, 0s lag // This lag record is required in the next step to correctly calculate how @@ -370,16 +310,12 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_IncreaseToDecrease(t *testing. tf.ratesHistory.add(sinceZero(79*time.Second), 200) tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) // r1 remains the "replica under test". - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 @ 90s, 0s lag (ignored because the test duration is not up yet) tf.ratesHistory.add(sinceZero(89*time.Second), 200) tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 100s, 3s lag (above target, provokes a decrease) tf.ratesHistory.add(sinceZero(99*time.Second), 200) @@ -387,18 +323,14 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_IncreaseToDecrease(t *testing. // r1 becomes the "replica under test". // r1's high lag triggered the decrease state and therefore we did not wait // for the pending increase of "replica under test" r2. - if err := tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(100*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(100*time.Second)) // r2 lag records are ignored while r1 is the "replica under test". // r2 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(100*time.Second), 200) tf.ratesHistory.add(sinceZero(109*time.Second), 140) tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) - if err := tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(100*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(100*time.Second)) // r1 leaves the "replica under test" as soon as the test duration is up // or its lag improved to a better state. @@ -408,19 +340,14 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_IncreaseToDecrease(t *testing. tf.ratesHistory.add(sinceZero(118*time.Second), 140) tf.process(lagRecord(sinceZero(119*time.Second), r1, 0)) // Rate increases to 170, the middle of: [good, bad] = [140, 200]. - if err := tf.checkState(stateIncreaseRate, 170, sinceZero(119*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 170, sinceZero(119*time.Second)) } // TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency verifies // that the current "replica under test" is ignored when our state changes from // "stateDecreaseAndGuessRate" to "stateEmergency". func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // INCREASE @@ -429,9 +356,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing // much r1 lags behind due to the rate increase. tf.ratesHistory.add(sinceZero(19*time.Second), 100) tf.process(lagRecord(sinceZero(20*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) // DECREASE @@ -439,9 +364,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing tf.ratesHistory.add(sinceZero(39*time.Second), 100) tf.process(lagRecord(sinceZero(40*time.Second), r1, 3)) // r1 becomes the "replica under test". - if err := tf.checkState(stateDecreaseAndGuessRate, 70, sinceZero(40*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 70, sinceZero(40*time.Second)) // EMERGENCY @@ -451,9 +374,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing tf.ratesHistory.add(sinceZero(49*time.Second), 70) tf.process(lagRecord(sinceZero(50*time.Second), r2, 10)) // r1 overrides r2 as new "replica under test". - if err := tf.checkState(stateEmergency, 35, sinceZero(50*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 35, sinceZero(50*time.Second)) // r1 lag becomes worse than the r1 lag now. We don't care and keep r1 as // "replica under test" for now. @@ -461,9 +382,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing tf.ratesHistory.add(sinceZero(50*time.Second), 70) tf.ratesHistory.add(sinceZero(59*time.Second), 35) tf.process(lagRecord(sinceZero(60*time.Second), r1, 15)) - if err := tf.checkState(stateEmergency, 35, sinceZero(50*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 35, sinceZero(50*time.Second)) // INCREASE @@ -472,9 +391,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing tf.ratesHistory.add(sinceZero(69*time.Second), 35) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // r2 becomes the new "replica under test". - if err := tf.checkState(stateIncreaseRate, 70, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 70, sinceZero(70*time.Second)) // EMERGENCY @@ -484,23 +401,16 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_DecreaseToEmergency(t *testing tf.ratesHistory.add(sinceZero(79*time.Second), 70) tf.process(lagRecord(sinceZero(80*time.Second), r1, 15)) // r1 becomes the new "replica under test". - if err := tf.checkState(stateEmergency, 35, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 35, sinceZero(80*time.Second)) } // TestMaxReplicationLagModule_Increase_BadRateUpperBound verifies that a // known bad rate is always the upper bound for any rate increase. func TestMaxReplicationLagModule_Increase_BadRateUpperBound(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // We start at config.InitialRate. - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) // Assume that a bad value of 150 was set @ 30s and log error if err := tf.m.memory.markBad(150, sinceZero(30*time.Second)); err != nil { @@ -514,24 +424,17 @@ func TestMaxReplicationLagModule_Increase_BadRateUpperBound(t *testing.T) { // [0s, 69s]. // However, this would go over the bad rate. Therefore, the new rate will be // the middle of [100, 150] ([actual rate, bad rate]). - if err := tf.checkState(stateIncreaseRate, 125, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 125, sinceZero(70*time.Second)) } // TestMaxReplicationLagModule_Increase_MinimumProgress verifies that the // calculated new rate is never identical to the current rate and at least by // "memoryGranularity" higher. func TestMaxReplicationLagModule_Increase_MinimumProgress(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // We start at config.InitialRate. - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) // Assume that a bad value of 105 was set @ 30s. tf.m.memory.markBad(105, sinceZero(30*time.Second)) @@ -547,26 +450,19 @@ func TestMaxReplicationLagModule_Increase_MinimumProgress(t *testing.T) { // But then the new rate is identical to the old set rate of 100. // In such a case, we always advance the new rate by "memoryGranularity" // (which is currently 5). - if err := tf.checkState(stateIncreaseRate, 105, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 105, sinceZero(70*time.Second)) } // TestMaxReplicationLagModule_Decrease verifies that we correctly calculate the // replication rate in the decreaseAndGuessRate state. func TestMaxReplicationLagModule_Decrease(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(5) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 5) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 @ 90s, 3s lag (above target, provokes a decrease) tf.ratesHistory.add(sinceZero(70*time.Second), 100) @@ -581,9 +477,7 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) { // Since this backlog is spread across SpreadBacklogAcrossSec (20s), // the guessed rate gets further reduced by 30 QPS (600 queries / 20s). // Hence, the rate is set to 140 QPS (170 - 30). - if err := tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(90*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 140, sinceZero(90*time.Second)) } // TestMaxReplicationLagModule_Decrease_NoReplicaHistory skips decreasing the @@ -591,44 +485,33 @@ func TestMaxReplicationLagModule_Decrease(t *testing.T) { // replication lag value since the last rate change for r2. Therefore, we cannot // reliably guess its rate and wait for the next available record. func TestMaxReplicationLagModule_Decrease_NoReplicaHistory(t *testing.T) { - tf, err := newTestFixtureWithMaxReplicationLag(10) - if err != nil { - t.Fatal(err) - } + tf := newTestFixtureWithMaxReplicationLag(t, 10) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 80s, 3s lag (above target, but no decrease triggered) tf.ratesHistory.add(sinceZero(70*time.Second), 100) tf.ratesHistory.add(sinceZero(79*time.Second), 200) tf.process(lagRecord(sinceZero(80*time.Second), r1, 3)) // Rate was decreased by 25% (half the emergency decrease) as safety measure. - if err := tf.checkState(stateDecreaseAndGuessRate, 150, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 150, sinceZero(80*time.Second)) // r2 @ 90s, 0s lag tf.ratesHistory.add(sinceZero(80*time.Second), 200) tf.ratesHistory.add(sinceZero(89*time.Second), 150) tf.process(lagRecord(sinceZero(90*time.Second), r2, 0)) // r2 is ignored because r1 is the "replica under test". - if err := tf.checkState(stateDecreaseAndGuessRate, 150, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateDecreaseAndGuessRate, 150, sinceZero(80*time.Second)) // r1 recovers after the rate decrease and triggers a new increase. // r1 @ 100s, 0s lag tf.ratesHistory.add(sinceZero(99*time.Second), 150) tf.process(lagRecord(sinceZero(100*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 300, sinceZero(100*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 300, sinceZero(100*time.Second)) } func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_REPLICA(t *testing.T) { @@ -648,33 +531,22 @@ func testIgnoreNSlowestReplicas(t *testing.T, r1UID, r2UID uint32) { config.IgnoreNSlowestRdonlys = 1 typ = "RDONLY" } - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // r1 @ 80s, 0s lag tf.ratesHistory.add(sinceZero(79*time.Second), 100) tf.process(lagRecord(sinceZero(80*time.Second), r1UID, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) // r2 @ 90s, 10s lag tf.ratesHistory.add(sinceZero(80*time.Second), 100) tf.ratesHistory.add(sinceZero(90*time.Second), 200) tf.process(lagRecord(sinceZero(90*time.Second), r2UID, 10)) // Although r2's lag is high, it's ignored because it's the 1 slowest replica. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) results := tf.m.results.latestValues() - if got, want := len(results), 2; got != want { - t.Fatalf("skipped replica should have been recorded on the results page. got = %v, want = %v", got, want) - } - if got, want := results[0].Reason, fmt.Sprintf("skipping this replica because it's among the 1 slowest %v tablets", typ); got != want { - t.Fatalf("skipped replica should have been recorded as skipped on the results page. got = %v, want = %v", got, want) - } + require.Len(t, results, 2, "skipped replica should have been recorded on the results page") + require.Equal(t, fmt.Sprintf("skipping this replica because it's among the 1 slowest %v tablets", typ), results[0].Reason, "skipped replica should have been recorded as skipped on the results page.") // r1 @ 100s, 20s lag tf.ratesHistory.add(sinceZero(99*time.Second), 200) @@ -682,27 +554,20 @@ func testIgnoreNSlowestReplicas(t *testing.T, r1UID, r2UID uint32) { // r1 would become the new 1 slowest replica. However, we do not ignore it // because then we would ignore all known replicas in a row. // => react to the high lag and reduce the rate by 50% from 200 to 100. - if err := tf.checkState(stateEmergency, 100, sinceZero(100*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 100, sinceZero(100*time.Second)) } func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_NotEnoughReplicas(t *testing.T) { config := NewMaxReplicationLagModuleConfig(5) config.IgnoreNSlowestReplicas = 1 - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // r2 @ 70s, 10s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 10)) // r2 is the 1 slowest replica. However, it's not ignored because then we // would ignore all replicas. Therefore, we react to its lag increase. - if err := tf.checkState(stateEmergency, 50, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 50, sinceZero(70*time.Second)) } // TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease @@ -715,45 +580,34 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_NotEnoughReplicas(t *tes func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IsIgnoredDuringIncrease(t *testing.T) { config := NewMaxReplicationLagModuleConfig(5) config.IgnoreNSlowestReplicas = 1 - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 200 based on actual rate of 100 within [0s, 69s]. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 80s, 0s lag tf.ratesHistory.add(sinceZero(70*time.Second), 100) tf.ratesHistory.add(sinceZero(79*time.Second), 200) tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) // Lag record was ignored because it's within the wait period. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 becomes slow and will be ignored now. // r2 @ 90s, 10s lag tf.ratesHistory.add(sinceZero(89*time.Second), 200) tf.m.replicaLagCache.add(lagRecord(sinceZero(90*time.Second), r2, 10)) // We ignore the 1 slowest replica and do not decrease despite r2's high lag. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r1 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(109*time.Second), 200) tf.process(lagRecord(sinceZero(110*time.Second), r1, 0)) // Meanwhile, r1 is doing fine and will trigger the next increase because // we're no longer waiting for the ignored r2. - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)) } // TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IncludeRdonly is the same @@ -764,55 +618,36 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IncludeRdonly(t *testing // We ignore up to 1 REPLICA and 1 RDONLY tablet. config.IgnoreNSlowestReplicas = 1 config.IgnoreNSlowestRdonlys = 1 - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // r1 @ 80s, 0s lag tf.ratesHistory.add(sinceZero(79*time.Second), 100) tf.process(lagRecord(sinceZero(80*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) // rdonly1 @ 85s, 0s lag tf.ratesHistory.add(sinceZero(80*time.Second), 100) tf.ratesHistory.add(sinceZero(84*time.Second), 200) tf.process(lagRecord(sinceZero(85*time.Second), rdonly1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) // r2 @ 90s, 10s lag tf.ratesHistory.add(sinceZero(89*time.Second), 200) tf.process(lagRecord(sinceZero(90*time.Second), r2, 10)) // Although r2's lag is high, it's ignored because it's the 1 slowest REPLICA tablet. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) results := tf.m.results.latestValues() - if got, want := len(results), 3; got != want { - t.Fatalf("skipped replica should have been recorded on the results page. got = %v, want = %v", got, want) - } - if got, want := results[0].Reason, "skipping this replica because it's among the 1 slowest REPLICA tablets"; got != want { - t.Fatalf("skipped replica should have been recorded as skipped on the results page. got = %v, want = %v", got, want) - } + require.Len(t, results, 3, "skipped replica should have been recorded on the results page") + require.Equal(t, "skipping this replica because it's among the 1 slowest REPLICA tablets", results[0].Reason, "skipped replica should have been recorded as skipped on the results page") // rdonly2 @ 95s, 10s lag tf.ratesHistory.add(sinceZero(94*time.Second), 200) tf.process(lagRecord(sinceZero(95*time.Second), rdonly2, 10)) // Although rdonly2's lag is high, it's ignored because it's the 1 slowest RDONLY tablet. - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(80*time.Second)) results = tf.m.results.latestValues() - if got, want := len(results), 4; got != want { - t.Fatalf("skipped replica should have been recorded on the results page. got = %v, want = %v", got, want) - } - if got, want := results[0].Reason, "skipping this replica because it's among the 1 slowest RDONLY tablets"; got != want { - t.Fatalf("skipped replica should have been recorded as skipped on the results page. got = %v, want = %v", got, want) - } + require.Len(t, results, 4, "skipped replica should have been recorded on the results page") + require.Equal(t, "skipping this replica because it's among the 1 slowest RDONLY tablets", results[0].Reason, "skipped replica should have been recorded as skipped on the results page") // r1 @ 100s, 11s lag tf.ratesHistory.add(sinceZero(99*time.Second), 200) @@ -820,9 +655,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IncludeRdonly(t *testing // r1 would become the new 1 slowest REPLICA tablet. However, we do not ignore // it because then we would ignore all known replicas in a row. // => react to the high lag and reduce the rate by 50% from 200 to 100. - if err := tf.checkState(stateEmergency, 100, sinceZero(100*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 100, sinceZero(100*time.Second)) // r2 and rdonly are omitted here for brevity. @@ -831,9 +664,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IncludeRdonly(t *testing // r1 @ 120s, 0s lag tf.ratesHistory.add(sinceZero(119*time.Second), 100) tf.process(lagRecord(sinceZero(120*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(120*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(120*time.Second)) // rdonly1 @ 125s, 11s lag tf.ratesHistory.add(sinceZero(120*time.Second), 100) @@ -842,9 +673,7 @@ func TestMaxReplicationLagModule_IgnoreNSlowestReplicas_IncludeRdonly(t *testing // rdonly1 would become the new 1 slowest RDONLY tablet. However, we do not // ignore it because then we would ignore all known replicas in a row. // => react to the high lag and reduce the rate by 50% from 200 to 100. - if err := tf.checkState(stateEmergency, 100, sinceZero(125*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 100, sinceZero(125*time.Second)) } // TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues verifies that a @@ -863,30 +692,21 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { // Use a very aggressive aging rate to verify that bad rates do not age while // we're in the "emergency" state. config.AgeBadRateAfterSec = 21 - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // INCREASE (necessary to set a "good" rate in the memory) // r2 @ 70s, 0s lag tf.ratesHistory.add(sinceZero(69*time.Second), 100) tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 200, sinceZero(70*time.Second)) // r2 @ 110s, 0s lag tf.ratesHistory.add(sinceZero(70*time.Second), 100) tf.ratesHistory.add(sinceZero(109*time.Second), 200) tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)); err != nil { - t.Fatal(err) - } - if got, want := tf.m.memory.highestGood(), int64(200); got != want { - t.Fatalf("wrong good rate: got = %v, want = %v", got, want) - } + tf.checkState(stateIncreaseRate, 400, sinceZero(110*time.Second)) + require.Equal(t, int64(200), tf.m.memory.highestGood(), "wrong good rate") // DECREASE (necessary to set a "bad" rate in the memory) @@ -894,12 +714,8 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { tf.ratesHistory.add(sinceZero(110*time.Second), 200) tf.ratesHistory.add(sinceZero(129*time.Second), 400) tf.process(lagRecord(sinceZero(130*time.Second), r2, 3)) - if err := tf.checkState(stateDecreaseAndGuessRate, 280, sinceZero(130*time.Second)); err != nil { - t.Fatal(err) - } - if got, want := tf.m.memory.lowestBad(), int64(400); got != want { - t.Fatalf("wrong bad rate: got = %v, want = %v", got, want) - } + tf.checkState(stateDecreaseAndGuessRate, 280, sinceZero(130*time.Second)) + require.Equal(t, int64(400), tf.m.memory.lowestBad(), "wrong bad rate") // memory: [good, bad] now is [200, 400]. @@ -910,12 +726,8 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { tf.ratesHistory.add(sinceZero(130*time.Second), 400) tf.ratesHistory.add(sinceZero(139*time.Second), 280) tf.process(lagRecord(sinceZero(140*time.Second), r1, 3600)) - if err := tf.checkState(stateEmergency, 140, sinceZero(140*time.Second)); err != nil { - t.Fatal(err) - } - if got, want := tf.m.memory.lowestBad(), int64(280); got != want { - t.Fatalf("bad rate should change when we transition to the emergency state: got = %v, want = %v", got, want) - } + tf.checkState(stateEmergency, 140, sinceZero(140*time.Second)) + require.Equal(t, int64(280), tf.m.memory.lowestBad(), "bad rate should change when we transition to the emergency state") // memory: [good, bad] now is [200, 280]. @@ -923,9 +735,7 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { tf.ratesHistory.add(sinceZero(140*time.Second), 280) tf.ratesHistory.add(sinceZero(149*time.Second), 140) tf.process(lagRecord(sinceZero(150*time.Second), r2, 0)) - if err := tf.checkState(stateEmergency, 140, sinceZero(140*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateEmergency, 140, sinceZero(140*time.Second)) tf.ratesHistory.add(sinceZero(160*time.Second), 140) // r1 keeps to drive the throttler rate down, but not the bad rate. @@ -940,20 +750,14 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { tf.ratesHistory.add(r1Time, int64(rates[i-1])) } tf.process(lagRecord(r1Time, r1, 3600)) - if err := tf.checkState(stateEmergency, int64(rates[i]), r1Time); err != nil { - t.Fatalf("time=%d: %v", tm, err) - } - if got, want := tf.m.memory.lowestBad(), int64(280); got != want { - t.Fatalf("time=%d: bad rate must not change when the old state is the emergency state: got = %v, want = %v", tm, got, want) - } + tf.checkState(stateEmergency, int64(rates[i]), r1Time) + require.Equal(t, int64(280), tf.m.memory.lowestBad(), "bad rate must not change when the old state is the emergency state") // r2 @ s, 0s lag (ignored because r1 is the "replica under test") r2Time := sinceZero(time.Duration(tm+10) * time.Second) tf.ratesHistory.add(r2Time, int64(rates[i])) tf.process(lagRecord(r2Time, r2, 0)) - if err := tf.checkState(stateEmergency, int64(rates[i]), r1Time); err != nil { - t.Fatalf("time=%d: %v", tm, err) - } + tf.checkState(stateEmergency, int64(rates[i]), r1Time) } // INCREASE @@ -966,25 +770,18 @@ func TestMaxReplicationLagModule_EmergencyDoesNotChangeBadValues(t *testing.T) { tf.ratesHistory.add(sinceZero(339*time.Second), 1) tf.process(lagRecord(sinceZero(340*time.Second), r1, 0)) // New rate is 240, the middle of [good, bad] = [200, 240]. - if err := tf.checkState(stateIncreaseRate, 240, sinceZero(340*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 240, sinceZero(340*time.Second)) } func TestMaxReplicationLagModule_NoIncreaseIfMaxRateWasNotApproached(t *testing.T) { config := NewMaxReplicationLagModuleConfig(5) - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // r1 @ 20s, 0s lag // This lag record is required in the next step to correctly calculate how // much r1 lags behind due to the rate increase. tf.process(lagRecord(sinceZero(20*time.Second), r1, 0)) - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) // Master gets 10 QPS in second 69. // r1 @ 70s, 0s lag. @@ -993,9 +790,7 @@ func TestMaxReplicationLagModule_NoIncreaseIfMaxRateWasNotApproached(t *testing. tf.ratesHistory.add(sinceZero(69*time.Second), 10) tf.process(lagRecord(sinceZero(70*time.Second), r1, 0)) // r1 becomes the "replica under test". - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) } // lagRecord creates a fake record using a fake LegacyTabletStats object. @@ -1034,15 +829,10 @@ func tabletStats(uid, lag uint32) discovery.TabletHealth { func TestApplyLatestConfig(t *testing.T) { config := NewMaxReplicationLagModuleConfig(5) - tf, err := newTestFixture(config) - if err != nil { - t.Fatal(err) - } + tf := newTestFixture(t, config) // We start at config.InitialRate. - if err := tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 100, sinceZero(1*time.Second)) // Change the default MaxIncrease from 100% to 200% and test that it's // correctly propagated. config.MaxIncrease = 2 @@ -1053,9 +843,7 @@ func TestApplyLatestConfig(t *testing.T) { tf.process(lagRecord(sinceZero(70*time.Second), r2, 0)) // Rate was increased to 300 based on an actual rate of 100 within [0s, 69s]. // That's a 200% increase. - if err := tf.checkState(stateIncreaseRate, 300, sinceZero(70*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 300, sinceZero(70*time.Second)) // Now reset the config to its default values. tf.m.resetConfiguration() @@ -1066,7 +854,5 @@ func TestApplyLatestConfig(t *testing.T) { tf.ratesHistory.add(sinceZero(80*time.Second), 300) tf.ratesHistory.add(sinceZero(109*time.Second), 300) tf.process(lagRecord(sinceZero(110*time.Second), r2, 0)) - if err := tf.checkState(stateIncreaseRate, 600, sinceZero(110*time.Second)); err != nil { - t.Fatal(err) - } + tf.checkState(stateIncreaseRate, 600, sinceZero(110*time.Second)) }