Skip to content

Commit

Permalink
Removed the use of the legacy healthcheck in the tx_throttler
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Jun 20, 2022
1 parent 499ff49 commit a809a13
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 56 deletions.
19 changes: 17 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"strings"
"sync"
"time"
"vitess.io/vitess/go/netutil"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -85,8 +86,8 @@ var (

// See the documentation for NewHealthCheck below for an explanation of these parameters.
const (
defaultHealthCheckRetryDelay = 5 * time.Second
defaultHealthCheckTimeout = 1 * time.Minute
DefaultHealthCheckRetryDelay = 5 * time.Second
DefaultHealthCheckTimeout = 1 * time.Minute

// DefaultTopoReadConcurrency is used as the default value for the topoReadConcurrency parameter of a TopologyWatcher.
DefaultTopoReadConcurrency int = 5
Expand Down Expand Up @@ -166,6 +167,8 @@ type tabletAliasString string

// HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
TabletRecorder

// CacheStatus returns a displayable version of the health check cache.
CacheStatus() TabletsCacheStatusList

Expand Down Expand Up @@ -899,3 +902,15 @@ func (hc *HealthCheckImpl) stateChecksum() int64 {

return int64(crc32.ChecksumIEEE(buf.Bytes()))
}

// TabletToMapKey creates a key to the map from tablet's host and ports.
// It should only be used in discovery and related module.
func TabletToMapKey(tablet *topodata.Tablet) string {
parts := make([]string, 0, 1)
for name, port := range tablet.PortMap {
parts = append(parts, netutil.JoinHostPort(name, port))
}
sort.Strings(parts)
parts = append([]string{tablet.Hostname}, parts...)
return strings.Join(parts, ",")
}
14 changes: 1 addition & 13 deletions go/vt/discovery/legacy_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ type legacyTabletHealth struct {

// NewLegacyDefaultHealthCheck creates a new LegacyHealthCheck object with a default configuration.
func NewLegacyDefaultHealthCheck() LegacyHealthCheck {
return NewLegacyHealthCheck(defaultHealthCheckRetryDelay, defaultHealthCheckTimeout)
return NewLegacyHealthCheck(DefaultHealthCheckRetryDelay, DefaultHealthCheckTimeout)
}

// NewLegacyHealthCheck creates a new LegacyHealthCheck object.
Expand Down Expand Up @@ -960,15 +960,3 @@ func (hc *LegacyHealthCheckImpl) Close() error {

return nil
}

// TabletToMapKey creates a key to the map from tablet's host and ports.
// It should only be used in discovery and related module.
func TabletToMapKey(tablet *topodatapb.Tablet) string {
parts := make([]string, 0, 1)
for name, port := range tablet.PortMap {
parts = append(parts, netutil.JoinHostPort(name, port))
}
sort.Strings(parts)
parts = append([]string{tablet.Hostname}, parts...)
return strings.Join(parts, ",")
}
30 changes: 18 additions & 12 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package main

import (
"context"
"flag"
"math/rand"
"net/http"
"sync"
"testing"
"time"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -103,9 +105,8 @@ type replica struct {
wg sync.WaitGroup
}

func newReplica(lagUpdateInterval, degrationInterval, degrationDuration time.Duration) *replica {
func newReplica(lagUpdateInterval, degrationInterval, degrationDuration time.Duration, ts *topo.Server) *replica {
t := &testing.T{}
ts := memorytopo.NewServer("cell1")
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
fakeTablet := testlib.NewFakeTablet(t, wr, "cell1", 0,
topodatapb.TabletType_REPLICA, nil, testlib.TabletKeyspaceShard(t, "ks", "-80"))
Expand Down Expand Up @@ -213,28 +214,30 @@ func (r *replica) stop() {
type client struct {
primary *primary

healthCheck discovery.LegacyHealthCheck
healthCheck discovery.HealthCheck
throttler *throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
stopChan chan struct{}
wg sync.WaitGroup
healthcheckCh chan *discovery.TabletHealth
}

func newClient(primary *primary, replica *replica) *client {
func newClient(primary *primary, replica *replica, ts *topo.Server) *client {
t, err := throttler.NewThrottler("client", "TPS", 1, throttler.MaxRateModuleDisabled, 5 /* seconds */)
if err != nil {
log.Fatal(err)
}

healthCheck := discovery.NewLegacyHealthCheck(5*time.Second, 1*time.Minute)
healthCheck := discovery.NewHealthCheck(context.Background(), 5*time.Second, 1*time.Minute, ts, "cell1", "")
c := &client{
primary: primary,
healthCheck: healthCheck,
throttler: t,
stopChan: make(chan struct{}),
}
c.healthCheck.SetListener(c, false /* sendDownEvents */)
c.healthCheck.AddTablet(replica.fakeTablet.Tablet, "name")
healthcheckCh := c.healthCheck.Subscribe()
c.healthcheckCh = healthcheckCh
c.healthCheck.AddTablet(replica.fakeTablet.Tablet)
return c
}

Expand All @@ -250,6 +253,8 @@ func (c *client) loop() {
select {
case <-c.stopChan:
return
case th := <-c.healthcheckCh:
c.StatsUpdate(th)
default:
}

Expand All @@ -276,7 +281,7 @@ func (c *client) stop() {
// StatsUpdate implements discovery.LegacyHealthCheckStatsListener.
// It gets called by the healthCheck instance every time a tablet broadcasts
// a health update.
func (c *client) StatsUpdate(ts *discovery.LegacyTabletStats) {
func (c *client) StatsUpdate(ts *discovery.TabletHealth) {
// Ignore unless REPLICA or RDONLY.
if ts.Target.TabletType != topodatapb.TabletType_REPLICA && ts.Target.TabletType != topodatapb.TabletType_RDONLY {
return
Expand All @@ -294,9 +299,10 @@ func main() {
})

log.Infof("start rate set to: %v", *rate)
replica := newReplica(*lagUpdateInterval, *replicaDegrationInterval, *replicaDegrationDuration)
ts := memorytopo.NewServer("cell1")
replica := newReplica(*lagUpdateInterval, *replicaDegrationInterval, *replicaDegrationDuration, ts)
primary := &primary{replica: replica}
client := newClient(primary, replica)
client := newClient(primary, replica, ts)
client.run()

time.Sleep(*duration)
Expand Down
16 changes: 8 additions & 8 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (m *MaxReplicationLagModule) resetConfiguration() {
}

// RecordReplicationLag records the current replication lag for processing.
func (m *MaxReplicationLagModule) RecordReplicationLag(t time.Time, ts *discovery.LegacyTabletStats) {
func (m *MaxReplicationLagModule) RecordReplicationLag(t time.Time, th *discovery.TabletHealth) {
m.mutableConfigMu.Lock()
if m.mutableConfig.MaxReplicationLagSec == ReplicationLagModuleDisabled {
m.mutableConfigMu.Unlock()
Expand All @@ -248,7 +248,7 @@ func (m *MaxReplicationLagModule) RecordReplicationLag(t time.Time, ts *discover

// Buffer data point for now to unblock the LegacyHealthCheck listener and process
// it asynchronously in ProcessRecords().
m.lagRecords <- replicationLagRecord{t, *ts}
m.lagRecords <- replicationLagRecord{t, *th}
}

// ProcessRecords is the main loop, run in a separate Go routine, which
Expand Down Expand Up @@ -331,7 +331,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec
var clear bool
var clearReason string

if m.lagCache(lagRecordNow).ignoreSlowReplica(lagRecordNow.Key) {
if m.lagCache(lagRecordNow).ignoreSlowReplica(discovery.TabletToMapKey(lagRecordNow.Tablet)) {
r.Reason = fmt.Sprintf("skipping this replica because it's among the %d slowest %v tablets", m.getNSlowestReplicasConfig(lagRecordNow), lagRecordNow.Target.TabletType.String())
goto logResult
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (m *MaxReplicationLagModule) clearReplicaUnderTest(now time.Time, testedSta

// Verify that the current replica under test is not in an error state.
lr := lagRecordNow
if m.replicaUnderTest.key != lr.Key {
if m.replicaUnderTest.key != discovery.TabletToMapKey(lr.Tablet) {
lr = m.lagCacheByType(m.replicaUnderTest.tabletType).latest(m.replicaUnderTest.key)
}
if lr.isZero() {
Expand Down Expand Up @@ -445,7 +445,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

if m.replicaUnderTest.key != lagRecordNow.Key {
if m.replicaUnderTest.key != discovery.TabletToMapKey(lagRecordNow.Tablet) {
r.Reason = fmt.Sprintf("skipping this replica because we're waiting for the next lag record from the 'replica under test': %v", m.replicaUnderTest.alias)
return false
}
Expand Down Expand Up @@ -557,7 +557,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(lagRecordNow.Key, m.lastRateChange)
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
if lagRecordBefore.isZero() {
// We should see at least "lagRecordNow" here because we did just insert it
// in processRecord().
Expand Down Expand Up @@ -592,7 +592,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,

if replicationLagChange == equal {
// The replication lag did not change. Keep going at the current rate.
r.Reason = fmt.Sprintf("did not decrease the rate because the lag did not change (assuming a 1s error margin)") //nolint
r.Reason = fmt.Sprintf("did not decrease the rate because the lag did not change (assuming a 1s error margin)") // nolint
return
}

Expand Down Expand Up @@ -705,7 +705,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int
}

m.lastRateChange = now
m.replicaUnderTest = &replicaUnderTest{lagRecordNow.Key, topoproto.TabletAliasString(lagRecordNow.Tablet.Alias), lagRecordNow.Target.TabletType, newState, now.Add(testDuration)}
m.replicaUnderTest = &replicaUnderTest{discovery.TabletToMapKey(lagRecordNow.Tablet), topoproto.TabletAliasString(lagRecordNow.Tablet.Alias), lagRecordNow.Target.TabletType, newState, now.Add(testDuration)}

if rate == oldRate {
return
Expand Down
14 changes: 6 additions & 8 deletions go/vt/throttler/max_replication_lag_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp(t *testing.T)
tf.ratesHistory.add(sinceZero(110*time.Second), 200)
tf.ratesHistory.add(sinceZero(114*time.Second), 400)
rNotUp := lagRecord(sinceZero(115*time.Second), r1, 0)
rNotUp.Up = false
rNotUp.Serving = false
tf.m.replicaLagCache.add(rNotUp)

// r2 @ 150s, 0s lag (lastError no longer set)
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestMaxReplicationLagModule_Increase_BadRateUpperBound(t *testing.T) {
t.Fatal(err)
}

//Assume that a bad value of 150 was set @ 30s and log error
// Assume that a bad value of 150 was set @ 30s and log error
if err := tf.m.memory.markBad(150, sinceZero(30*time.Second)); err != nil {
log.Errorf("tf.m.memory.markBad(150, sinceZero(30*time.Second)) falied : %v", err)
}
Expand Down Expand Up @@ -955,7 +955,7 @@ func lagRecord(t time.Time, uid, lag uint32) replicationLagRecord {
}

// tabletStats creates fake tablet health data.
func tabletStats(uid, lag uint32) discovery.LegacyTabletStats {
func tabletStats(uid, lag uint32) discovery.TabletHealth {
typ := topodatapb.TabletType_REPLICA
if uid == rdonly1 || uid == rdonly2 {
typ = topodatapb.TabletType_RDONLY
Expand All @@ -967,21 +967,19 @@ func tabletStats(uid, lag uint32) discovery.LegacyTabletStats {
Type: typ,
PortMap: map[string]int32{"vt": int32(uid)},
}
return discovery.LegacyTabletStats{
return discovery.TabletHealth{
Tablet: tablet,
Key: discovery.TabletToMapKey(tablet),
Target: &querypb.Target{
Keyspace: "ks1",
Shard: "-80",
TabletType: typ,
},
Up: true,
Serving: true,
Stats: &querypb.RealtimeStats{
ReplicationLagSeconds: lag,
},
TabletExternallyReparentedTimestamp: 22,
LastError: nil,
PrimaryTermStartTime: 22,
LastError: nil,
}
}

Expand Down
16 changes: 8 additions & 8 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
if !r.Up {
if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, r.Key)
delete(c.ignoredSlowReplicasInARow, r.Key)
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
delete(c.ignoredSlowReplicasInARow, discovery.TabletToMapKey(r.Tablet))
return
}

entry, ok := c.entries[r.Key]
entry, ok := c.entries[discovery.TabletToMapKey(r.Tablet)]
if !ok {
entry = newReplicationLagHistory(c.historyCapacityPerReplica)
c.entries[r.Key] = entry
c.entries[discovery.TabletToMapKey(r.Tablet)] = entry
}

entry.add(r)
Expand Down Expand Up @@ -114,21 +114,21 @@ func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumRepli
for _, v := range c.entries {
record := v.latest()
if int64(record.Stats.ReplicationLagSeconds) >= minimumReplicationLag {
list = append(list, record.LegacyTabletStats)
list = append(list, record.TabletHealth)
i++
}
}
sort.Sort(list)

// Now remember the N slowest replicas.
for i := len(list) - 1; len(list) > 0 && i >= len(list)-ignoreNSlowestReplicas; i-- {
c.slowReplicas[list[i].Key] = true
c.slowReplicas[discovery.TabletToMapKey(list[i].Tablet)] = true
}
}

// byLagAndTabletUID is a slice of discovery.LegacyTabletStats elements that
// implements sort.Interface to sort by replication lag and tablet Uid.
type byLagAndTabletUID []discovery.LegacyTabletStats
type byLagAndTabletUID []discovery.TabletHealth

func (a byLagAndTabletUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLagAndTabletUID) Len() int { return len(a) }
Expand Down
5 changes: 3 additions & 2 deletions go/vt/throttler/replication_lag_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package throttler
import (
"testing"
"time"
"vitess.io/vitess/go/vt/discovery"
)

// TestReplicationLagCache tests that the ring buffer in "replicationLagHistory"
Expand All @@ -27,7 +28,7 @@ import (
// max_replication_lag_module_test.go.
func TestReplicationLagCache(t *testing.T) {
c := newReplicationLagCache(2)
r1Key := tabletStats(r1, 1).Key
r1Key := discovery.TabletToMapKey(tabletStats(r1, 1).Tablet)

// If there is no entry yet, a zero struct is returned.
zeroEntry := c.atOrAfter(r1Key, sinceZero(0*time.Second))
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestReplicationLagCache(t *testing.T) {

func TestReplicationLagCache_SortByLag(t *testing.T) {
c := newReplicationLagCache(2)
r1Key := tabletStats(r1, 1).Key
r1Key := discovery.TabletToMapKey(tabletStats(r1, 1).Tablet)

c.add(lagRecord(sinceZero(1*time.Second), r1, 30))
c.sortByLag(1 /* ignoreNSlowestReplicas */, 30 /* minimumReplicationLag */)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/replication_lag_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type replicationLagRecord struct {
time time.Time

// LegacyTabletStats holds a copy of the current health data of the tablet.
discovery.LegacyTabletStats
discovery.TabletHealth
}

func (r replicationLagRecord) isZero() bool {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ func (t *Throttler) SetMaxRate(rate int64) {
// RecordReplicationLag must be called by users to report the "ts" tablet health
// data observed at "time".
// Note: After Close() is called, this method must not be called anymore.
func (t *Throttler) RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats) {
t.maxReplicationLagModule.RecordReplicationLag(time, ts)
func (t *Throttler) RecordReplicationLag(time time.Time, th *discovery.TabletHealth) {
t.maxReplicationLagModule.RecordReplicationLag(time, th)
}

// GetConfiguration returns the configuration of the MaxReplicationLag module.
Expand Down

0 comments on commit a809a13

Please sign in to comment.