diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 771d9e3479d..33a0a680799 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1857,8 +1857,9 @@ func (m mockTxThrottler) Open() (err error) { return nil } -func (m mockTxThrottler) Close() { -} +func (m mockTxThrottler) Close() {} +func (m mockTxThrottler) MakePrimary() {} +func (m mockTxThrottler) MakeNonPrimary() {} func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) { return m.throttle diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 308f9165ba6..e231a628748 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -174,6 +174,8 @@ type ( txThrottler interface { Open() error Close() + MakePrimary() + MakeNonPrimary() } onlineDDLExecutor interface { @@ -456,6 +458,7 @@ func (sm *stateManager) servePrimary() error { sm.hs.MakePrimary(true) sm.se.MakePrimary(true) sm.rt.MakePrimary() + sm.txThrottler.MakePrimary() sm.tracker.Open() // We instantly kill all stateful queries to allow for // te to quickly transition into RW, but olap and stateless @@ -482,6 +485,7 @@ func (sm *stateManager) unservePrimary() error { sm.se.MakePrimary(false) sm.hs.MakePrimary(false) sm.rt.MakePrimary() + sm.txThrottler.MakePrimary() sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing) return nil } @@ -498,6 +502,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.tracker.Close() sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { return err @@ -516,6 +521,7 @@ func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType) sm.se.MakeNonPrimary() sm.hs.MakeNonPrimary() + sm.txThrottler.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { return err diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 02896eeefe0..45257c995ad 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -77,18 +77,17 @@ func TestStateManagerServePrimary(t *testing.T) { assert.Equal(t, testNow, sm.ptsTimestamp) verifySubcomponent(t, 1, sm.watcher, testStateClosed) - verifySubcomponent(t, 2, sm.se, testStateOpen) verifySubcomponent(t, 3, sm.vstreamer, testStateOpen) verifySubcomponent(t, 4, sm.qe, testStateOpen) - verifySubcomponent(t, 5, sm.txThrottler, testStateOpen) verifySubcomponent(t, 6, sm.rt, testStatePrimary) - verifySubcomponent(t, 7, sm.tracker, testStateOpen) - verifySubcomponent(t, 8, sm.te, testStatePrimary) - verifySubcomponent(t, 9, sm.messager, testStateOpen) - verifySubcomponent(t, 10, sm.throttler, testStateOpen) - verifySubcomponent(t, 11, sm.tableGC, testStateOpen) - verifySubcomponent(t, 12, sm.ddle, testStateOpen) + verifySubcomponent(t, 7, sm.txThrottler, testStatePrimary) + verifySubcomponent(t, 8, sm.tracker, testStateOpen) + verifySubcomponent(t, 9, sm.te, testStatePrimary) + verifySubcomponent(t, 10, sm.messager, testStateOpen) + verifySubcomponent(t, 11, sm.throttler, testStateOpen) + verifySubcomponent(t, 12, sm.tableGC, testStateOpen) + verifySubcomponent(t, 13, sm.ddle, testStateOpen) assert.False(t, sm.se.(*testSchemaEngine).nonPrimary) assert.True(t, sm.se.(*testSchemaEngine).ensureCalled) @@ -109,14 +108,14 @@ func TestStateManagerServeNonPrimary(t *testing.T) { verifySubcomponent(t, 4, sm.tracker, testStateClosed) assert.True(t, sm.se.(*testSchemaEngine).nonPrimary) - verifySubcomponent(t, 5, sm.se, testStateOpen) - verifySubcomponent(t, 6, sm.vstreamer, testStateOpen) - verifySubcomponent(t, 7, sm.qe, testStateOpen) - verifySubcomponent(t, 8, sm.txThrottler, testStateOpen) - verifySubcomponent(t, 9, sm.te, testStateNonPrimary) - verifySubcomponent(t, 10, sm.rt, testStateNonPrimary) - verifySubcomponent(t, 11, sm.watcher, testStateOpen) - verifySubcomponent(t, 12, sm.throttler, testStateOpen) + verifySubcomponent(t, 6, sm.se, testStateOpen) + verifySubcomponent(t, 7, sm.vstreamer, testStateOpen) + verifySubcomponent(t, 8, sm.qe, testStateOpen) + verifySubcomponent(t, 9, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 10, sm.te, testStateNonPrimary) + verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) + verifySubcomponent(t, 12, sm.watcher, testStateOpen) + verifySubcomponent(t, 13, sm.throttler, testStateOpen) assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType) assert.Equal(t, StateServing, sm.state) @@ -139,9 +138,9 @@ func TestStateManagerUnservePrimary(t *testing.T) { verifySubcomponent(t, 8, sm.se, testStateOpen) verifySubcomponent(t, 9, sm.vstreamer, testStateOpen) verifySubcomponent(t, 10, sm.qe, testStateOpen) - verifySubcomponent(t, 11, sm.txThrottler, testStateOpen) verifySubcomponent(t, 12, sm.rt, testStatePrimary) + verifySubcomponent(t, 13, sm.txThrottler, testStatePrimary) assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType) assert.Equal(t, StateNotServing, sm.state) @@ -165,7 +164,7 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) { verifySubcomponent(t, 7, sm.se, testStateOpen) verifySubcomponent(t, 8, sm.vstreamer, testStateOpen) verifySubcomponent(t, 9, sm.qe, testStateOpen) - verifySubcomponent(t, 10, sm.txThrottler, testStateOpen) + verifySubcomponent(t, 10, sm.txThrottler, testStateNonPrimary) verifySubcomponent(t, 11, sm.rt, testStateNonPrimary) verifySubcomponent(t, 12, sm.watcher, testStateOpen) @@ -932,6 +931,16 @@ func (te *testTxThrottler) Close() { te.state = testStateClosed } +func (te *testTxThrottler) MakePrimary() { + te.order = order.Add(1) + te.state = testStatePrimary +} + +func (te *testTxThrottler) MakeNonPrimary() { + te.order = order.Add(1) + te.state = testStateNonPrimary +} + type testOnlineDDLExecutor struct { testOrderState } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 18fafa8eb96..41f075e3802 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -66,6 +66,8 @@ type TxThrottler interface { InitDBConfig(target *querypb.Target) Open() (err error) Close() + MakePrimary() + MakeNonPrimary() Throttle(priority int, workload string) (result bool) } @@ -130,6 +132,8 @@ type txThrottler struct { } type txThrottlerState interface { + makePrimary() + makeNonPrimary() deallocateResources() StatsUpdate(tabletStats *discovery.TabletHealth) throttle() bool @@ -138,25 +142,24 @@ type txThrottlerState interface { // txThrottlerStateImpl holds the state of an open TxThrottler object. type txThrottlerStateImpl struct { config *tabletenv.TabletConfig + target *querypb.Target txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler throttler.Throttler - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler throttler.Throttler - healthCheck discovery.HealthCheck - healthCheckChan chan *discovery.TabletHealth - healthCheckCells []string - cellsFromTopo bool + cellsFromTopo bool + healthCheck discovery.HealthCheck + healthCheckCancel context.CancelFunc + healthCheckCells []string + healthCheckChan chan *discovery.TabletHealth + maxLag int64 + wg sync.WaitGroup // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool - - maxLag int64 - done chan bool - waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the relevant @@ -221,6 +224,22 @@ func (t *txThrottler) Close() { log.Info("txThrottler: closed") } +// MakePrimary performs a transition to a primary tablet. This will enable healthchecks to +// watch the replication lag state of other tablets. +func (t *txThrottler) MakePrimary() { + if t.state != nil { + t.state.makePrimary() + } +} + +// MakePrimary performs a transition to a non-primary tablet. This disables healthchecks +// for replication lag state if we were primary. +func (t *txThrottler) MakeNonPrimary() { + if t.state != nil { + t.state.makeNonPrimary() + } +} + // Throttle should be called before a new transaction is started. // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called @@ -268,61 +287,59 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } - state := &txThrottlerStateImpl{ - config: config, - healthCheckCells: config.TxThrottlerHealthCheckCells, - tabletTypes: tabletTypes, - throttler: t, - txThrottler: txThrottler, - done: make(chan bool, 1), - } - // get cells from topo if none defined in tabletenv config - if len(state.healthCheckCells) == 0 { + var cellsFromTopo bool + healthCheckCells := config.TxThrottlerHealthCheckCells + if len(healthCheckCells) == 0 { ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) - state.cellsFromTopo = true + healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - state.healthCheck.RegisterStats() - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) - state.waitForTermination.Add(1) - go state.updateMaxLag() - - return state, nil + return &txThrottlerStateImpl{ + config: config, + cellsFromTopo: cellsFromTopo, + healthCheckCells: healthCheckCells, + tabletTypes: tabletTypes, + target: target, + throttler: t, + txThrottler: txThrottler, + }, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheck() { + ts.healthCheck = healthCheckFactory(ts.txThrottler.topoServer, ts.target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() + ts.healthCheck.RegisterStats() } -func (ts *txThrottlerStateImpl) closeHealthCheckStream() { +func (ts *txThrottlerStateImpl) closeHealthCheck() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.healthCheckCancel() + ts.wg.Wait() ts.healthCheck.Close() + ts.healthCheck = nil + ts.maxLag = 0 } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context) { fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - knownCells := fetchKnownCells(fetchCtx, topoServer, target) + knownCells := fetchKnownCells(fetchCtx, ts.txThrottler.topoServer, ts.target) if !slices.Equal(knownCells, ts.healthCheckCells) { log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells - ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + ts.closeHealthCheck() + ts.initHealthCheck() } } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthCheckProcessor(ctx context.Context) { + defer ts.wg.Done() var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -334,18 +351,39 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS case <-ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + ts.updateHealthCheckCells(ctx) case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } } } +func (ts *txThrottlerStateImpl) makePrimary() { + ts.initHealthCheck() + var ctx context.Context + ctx, ts.healthCheckCancel = context.WithCancel(context.Background()) + + ts.wg.Add(1) + go ts.healthCheckProcessor(ctx) + + ts.wg.Add(1) + go ts.updateMaxLag(ctx) +} + +func (ts *txThrottlerStateImpl) makeNonPrimary() { + ts.closeHealthCheck() +} + func (ts *txThrottlerStateImpl) throttle() bool { if ts.throttler == nil { log.Error("txThrottler: throttle called after deallocateResources was called") return false } + // return false if we are not watching lag + if ts.healthCheck == nil { + return false + } + // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() @@ -356,17 +394,17 @@ func (ts *txThrottlerStateImpl) throttle() bool { ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerStateImpl) updateMaxLag() { - defer ts.waitForTermination.Done() +func (ts *txThrottlerStateImpl) updateMaxLag(ctx context.Context) { + defer ts.wg.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second) defer ticker.Stop() -outerloop: for { select { + case <-ctx.Done(): + return case <-ticker.C: var maxLag uint32 - for tabletType := range ts.tabletTypes { maxLagPerTabletType := ts.throttler.MaxLag(tabletType) if maxLagPerTabletType > maxLag { @@ -374,28 +412,23 @@ outerloop: } } atomic.StoreInt64(&ts.maxLag, int64(maxLag)) - case <-ts.done: - break outerloop } } } func (ts *txThrottlerStateImpl) deallocateResources() { - // Close healthcheck and topo watchers - ts.closeHealthCheckStream() - ts.healthCheck = nil + // Close healthcheck and max lag updater + ts.closeHealthCheck() - ts.done <- true - ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() ts.throttler = nil } -// StatsUpdate updates the health of a tablet with the given healthcheck. +// StatsUpdate updates the health of a tablet with the given healthcheck, when primary. func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) { - if len(ts.tabletTypes) == 0 { + if ts.healthCheck == nil || len(ts.tabletTypes) == 0 { return } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 25c855b898b..3e46e79c70b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -51,6 +51,7 @@ func TestDisabledThrottler(t *testing.T) { Shard: "shard", }) assert.Nil(t, throttler.Open()) + throttler.MakePrimary() assert.False(t, throttler.Throttle(0, "some-workload")) throttlerImpl, _ := throttler.(*txThrottler) assert.Zero(t, throttlerImpl.throttlerRunning.Get()) @@ -144,9 +145,17 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - // Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a + // check .throttle() returns false when non-primary and healthCheck is nil (not watching for lag) + assert.False(t, throttlerStateImpl.throttle()) + assert.Nil(t, throttlerStateImpl.healthCheck) + + // makePrimary and confirm healthcheck starts + throttlerStateImpl.makePrimary() + assert.NotNil(t, throttlerStateImpl.healthCheck) + + // Stop the lag/healthcheck go routines that keeps updating the cached shard's max lag to prevent it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerStateImpl.done <- true + throttlerStateImpl.healthCheckCancel() // 1 should not throttle due to return value of underlying Throttle(), despite high lag atomic.StoreInt64(&throttlerStateImpl.maxLag, 20) @@ -243,12 +252,10 @@ type mockTxThrottlerState struct { shouldThrottle bool } -func (t *mockTxThrottlerState) deallocateResources() { - -} -func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - -} +func (t *mockTxThrottlerState) makePrimary() {} +func (t *mockTxThrottlerState) makeNonPrimary() {} +func (t *mockTxThrottlerState) deallocateResources() {} +func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {} func (t *mockTxThrottlerState) throttle() bool { return t.shouldThrottle