Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: move ThrottlerInterface to go/vt/throttler, use slices pkg, add stats #16248

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type replica struct {

// throttler is used to enforce the maximum rate at which replica applies
// transactions. It must not be confused with the client's throttler.
throttler *throttler.Throttler
throttler throttler.Throttler
lastHealthUpdate time.Time
lagUpdateInterval time.Duration

Expand Down Expand Up @@ -226,7 +226,7 @@ type client struct {
primary *primary

healthCheck discovery.HealthCheck
throttler *throttler.Throttler
throttler throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
Expand Down
10 changes: 5 additions & 5 deletions go/vt/throttler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ type managerImpl struct {
// mu guards all fields in this group.
mu sync.Mutex
// throttlers tracks all running throttlers (by their name).
throttlers map[string]*Throttler
throttlers map[string]Throttler
}

func newManager() *managerImpl {
return &managerImpl{
throttlers: make(map[string]*Throttler),
throttlers: make(map[string]Throttler),
}
}

func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error {
func (m *managerImpl) registerThrottler(name string, throttler Throttler) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down Expand Up @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {

// log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) log(throttlerName string) ([]result, error) {
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}

return t.log(), nil
return t.Log(), nil
}
2 changes: 1 addition & 1 deletion go/vt/throttler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

type managerTestFixture struct {
m *managerImpl
t1, t2 *Throttler
t1, t2 Throttler
}

func (f *managerTestFixture) setUp() error {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec

m.memory.ageBadRate(now)

r := result{
r := Result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
Expand Down Expand Up @@ -446,7 +446,7 @@ func stateGreater(a, b state) bool {
// and we should not skip the current replica ("lagRecordNow").
// Even if it's the same replica we may skip it and return false because
// we want to wait longer for the propagation of the current rate change.
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
if m.replicaUnderTest == nil {
return true
}
Expand All @@ -472,7 +472,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)

oldRate := m.rate.Load()
Expand Down Expand Up @@ -560,7 +560,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
return minDuration
}

func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
Expand Down Expand Up @@ -631,7 +631,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
// guessReplicationRate guesses the actual replication rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgReplicationRate is the average rate (per second) at which the replica
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
Expand Down Expand Up @@ -676,14 +676,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
return int64(newRate), reason
}

func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)

decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
}

func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
oldRate := m.rate.Load()
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
if rate == 0 {
Expand All @@ -695,7 +695,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
}

func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
oldRate := m.rate.Load()

m.currentState = newState
Expand Down Expand Up @@ -723,7 +723,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int

// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
Expand Down Expand Up @@ -797,6 +797,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
}
}

func (m *MaxReplicationLagModule) log() []result {
func (m *MaxReplicationLagModule) log() []Result {
return m.results.latestValues()
}
24 changes: 12 additions & 12 deletions go/vt/throttler/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}}
lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}}
reason: {{.Reason}}`))

// result is generated by the MaxReplicationLag module for each processed
// Result is generated by the MaxReplicationLag module for each processed
// "replicationLagRecord".
// It captures the details and the decision of the processing.
type result struct {
type Result struct {
Now time.Time
RateChange rateChange
lastRateChange time.Time
Expand All @@ -80,33 +80,33 @@ type result struct {
GuessedReplicationBacklogNew int
}

func (r result) String() string {
func (r Result) String() string {
var b strings.Builder
if err := resultStringTemplate.Execute(&b, r); err != nil {
panic(fmt.Sprintf("failed to Execute() template: %v", err))
}
return b.String()
}

func (r result) Alias() string {
func (r Result) Alias() string {
return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias)
}

func (r result) TimeSinceLastRateChange() string {
func (r Result) TimeSinceLastRateChange() string {
if r.lastRateChange.IsZero() {
return "n/a"
}
return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds())
}

func (r result) LagBefore() string {
func (r Result) LagBefore() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds)
}

func (r result) AgeOfBeforeLag() string {
func (r Result) AgeOfBeforeLag() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
Expand All @@ -123,18 +123,18 @@ type resultRing struct {
// started reusing entries.
wrapped bool
// values is the underlying ring buffer.
values []result
values []Result
}

// newResultRing creates a new resultRing.
func newResultRing(capacity int) *resultRing {
return &resultRing{
values: make([]result, capacity),
values: make([]Result, capacity),
}
}

// add inserts a new result into the ring buffer.
func (rr *resultRing) add(r result) {
func (rr *resultRing) add(r Result) {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) {

// latestValues returns all values of the buffer. Entries are sorted in reverse
// chronological order i.e. newer items come first.
func (rr *resultRing) latestValues() []result {
func (rr *resultRing) latestValues() []Result {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result {
count = rr.position
}

results := make([]result, count)
results := make([]Result, count)
for i := 0; i < count; i++ {
pos := start - i
if pos < 0 {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
resultIncreased = result{
resultIncreased = Result{
Now: sinceZero(1234 * time.Millisecond),
RateChange: increasedRate,
lastRateChange: sinceZero(1 * time.Millisecond),
Expand All @@ -46,7 +46,7 @@ var (
GuessedReplicationBacklogOld: 0,
GuessedReplicationBacklogNew: 0,
}
resultDecreased = result{
resultDecreased = Result{
Now: sinceZero(5000 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(1234 * time.Millisecond),
Expand All @@ -68,7 +68,7 @@ var (
GuessedReplicationBacklogOld: 10,
GuessedReplicationBacklogNew: 20,
}
resultEmergency = result{
resultEmergency = Result{
Now: sinceZero(10123 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(5000 * time.Millisecond),
Expand All @@ -94,7 +94,7 @@ var (

func TestResultString(t *testing.T) {
testcases := []struct {
r result
r Result
want string
}{
{
Expand Down Expand Up @@ -134,24 +134,24 @@ reason: emergency state decreased the rate`,

func TestResultRing(t *testing.T) {
// Test data.
r1 := result{Reason: "r1"}
r2 := result{Reason: "r2"}
r3 := result{Reason: "r3"}
r1 := Result{Reason: "r1"}
r2 := Result{Reason: "r2"}
r3 := Result{Reason: "r3"}

rr := newResultRing(2)

// Use the ring partially.
rr.add(r1)
got, want := rr.latestValues(), []result{r1}
got, want := rr.latestValues(), []Result{r1}
require.Equal(t, want, got, "items not correctly added to resultRing")

// Use it fully.
rr.add(r2)
got, want = rr.latestValues(), []result{r2, r1}
got, want = rr.latestValues(), []Result{r2, r1}
require.Equal(t, want, got, "items not correctly added to resultRing")

// Let it wrap.
rr.add(r3)
got, want = rr.latestValues(), []result{r3, r2}
got, want = rr.latestValues(), []Result{r3, r2}
require.Equal(t, want, got, "resultRing did not wrap correctly")
}
Loading
Loading