Skip to content

Commit

Permalink
Fixed build issue and code comments
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Jun 21, 2022
1 parent 5c66c6e commit 4654a77
Show file tree
Hide file tree
Showing 15 changed files with 20 additions and 63 deletions.
3 changes: 1 addition & 2 deletions go/vt/discovery/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (
)

// This file contains helper filter methods to process the unfiltered list of
// tablets returned by LegacyHealthCheck.GetTabletStatsFrom*.
// See also legacy_replicationlag.go for a more sophisicated filter used by vtgate.
// tablets returned by HealthCheckImpl.GetTabletHealth*.

func TabletHealthReferenceListToValue(thl []*TabletHealth) []TabletHealth {
newTh := []TabletHealth{}
Expand Down
3 changes: 1 addition & 2 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ func (c *client) stop() {
c.throttler.Close()
}

// StatsUpdate implements discovery.LegacyHealthCheckStatsListener.
// It gets called by the healthCheck instance every time a tablet broadcasts
// StatsUpdate gets called by the healthCheck instance every time a tablet broadcasts
// a health update.
func (c *client) StatsUpdate(ts *discovery.TabletHealth) {
// Ignore unless REPLICA or RDONLY.
Expand Down
10 changes: 5 additions & 5 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
// i.e. we'll ignore lag records with lower lag from other replicas while we're
// waiting for the next record of this replica under test.
type replicaUnderTest struct {
// key holds the discovery.LegacyTabletStats.Key value for the replica.
// key holds the key value for the replica.
key string
alias string
tabletType topodatapb.TabletType
Expand Down Expand Up @@ -114,8 +114,8 @@ type MaxReplicationLagModule struct {
// max rate calculation has changed. The field is immutable (set in Start().)
rateUpdateChan chan<- struct{}

// lagRecords buffers the replication lag records received by the LegacyHealthCheck
// listener. ProcessRecords() will process them.
// lagRecords buffers the replication lag records received by the HealthCheck
// subscriber. ProcessRecords() will process them.
lagRecords chan replicationLagRecord
wg sync.WaitGroup

Expand Down Expand Up @@ -246,7 +246,7 @@ func (m *MaxReplicationLagModule) RecordReplicationLag(t time.Time, th *discover
}
m.mutableConfigMu.Unlock()

// Buffer data point for now to unblock the LegacyHealthCheck listener and process
// Buffer data point for now to unblock the HealthCheck subscriber and process
// it asynchronously in ProcessRecords().
m.lagRecords <- replicationLagRecord{t, *th}
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func (m *MaxReplicationLagModule) clearReplicaUnderTest(now time.Time, testedSta
return true, "it is no longer actively tracked"
}
if lr.LastError != nil {
// LastError is set i.e. LegacyHealthCheck module cannot connect and the cached
// LastError is set i.e. HealthCheck module cannot connect and the cached
// data for the replica might be outdated.
return true, "it has LastError set i.e. is no longer correctly tracked"
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/max_replication_lag_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestMaxReplicationLagModule_ReplicaUnderTest_LastErrorOrNotUp(t *testing.T)

// r2 @ 75s, 0s lag, LastError set
rError := lagRecord(sinceZero(75*time.Second), r2, 0)
rError.LastError = errors.New("LegacyHealthCheck reporting broken")
rError.LastError = errors.New("HealthCheck reporting broken")
tf.m.replicaLagCache.add(rError)

// r1 @ 110s, 0s lag
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumRepli
}
}

// byLagAndTabletUID is a slice of discovery.LegacyTabletStats elements that
// byLagAndTabletUID is a slice of discovery.TabletHealth elements that
// implements sort.Interface to sort by replication lag and tablet Uid.
type byLagAndTabletUID []discovery.TabletHealth

Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/replication_lag_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// replicationLagRecord stores the tablet health data for a given point in time.
// This data is obtained via the LegacyHealthCheck module.
// This data is obtained via the HealthCheck module.
type replicationLagRecord struct {
// time is the time at which "value" was observed.
time time.Time
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctld/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
jsonContentType = "application/json; charset=utf-8"
)

// TabletStats represents realtime stats from a discovery.LegacyTabletStats struct.
// TabletStats represents realtime stats from a discovery.TabletHealth struct.
type TabletStats struct {
LastError string `json:"last_error,omitempty"`
Realtime *querypb.RealtimeStats `json:"realtime,omitempty"`
Expand Down
25 changes: 1 addition & 24 deletions go/vt/vtgate/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Buffer struct {
// In particular, it is used to serialize the following Go routines:
// - 1. Requests which may buffer (RLock, can be run in parallel)
// - 2. Request which starts buffering (based on the seen error)
// - 3. LegacyHealthCheck listener ("StatsUpdate") which stops buffering
// - 3. HealthCheck subscriber ("StatsUpdate") which stops buffering
// - 4. Timer which may stop buffering after -buffer_max_failover_duration
mu sync.RWMutex
// buffers holds a shardBuffer object per shard, even if no failover is in
Expand Down Expand Up @@ -171,29 +171,6 @@ func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) {
}
}

// StatsUpdate keeps track of the "tablet_externally_reparented_timestamp" of
// each primary. This way we can detect the end of a failover.
// It is part of the discovery.LegacyHealthCheckStatsListener interface.
func (b *Buffer) StatsUpdate(ts *discovery.LegacyTabletStats) {
if ts.Target.TabletType != topodatapb.TabletType_PRIMARY {
panic(fmt.Sprintf("BUG: non-PRIMARY LegacyTabletStats object must not be forwarded: %#v", ts))
}

timestamp := ts.TabletExternallyReparentedTimestamp
if timestamp == 0 {
// Primarys where TabletExternallyReparented was never called will return 0.
// Ignore them.
return
}

sb := b.getOrCreateBuffer(ts.Target.Keyspace, ts.Target.Shard)
if sb == nil {
// Buffer is shut down. Ignore all calls.
return
}
sb.recordExternallyReparentedTimestamp(timestamp, ts.Tablet.Alias)
}

// getOrCreateBuffer returns the ShardBuffer for the given keyspace and shard.
// It returns nil if Buffer is shut down and all calls should be ignored.
func (b *Buffer) getOrCreateBuffer(keyspace, shard string) *shardBuffer {
Expand Down
11 changes: 0 additions & 11 deletions go/vt/vtgate/buffer/buffer_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,6 @@ func testAllImplementations(t *testing.T, runTest func(t *testing.T, fail failov
})
})

t.Run("LegacyHealthCheck", func(t *testing.T) {
t.Helper()
runTest(t, func(buf *Buffer, tablet *topodatapb.Tablet, keyspace, shard string, now time.Time) {
buf.StatsUpdate(&discovery.LegacyTabletStats{
Tablet: tablet,
Target: &query.Target{Keyspace: keyspace, Shard: shard, TabletType: topodatapb.TabletType_PRIMARY},
TabletExternallyReparentedTimestamp: now.Unix(),
})
})
})

t.Run("KeyspaceEvent", func(t *testing.T) {
t.Helper()
runTest(t, func(buf *Buffer, tablet *topodatapb.Tablet, keyspace, shard string, now time.Time) {
Expand Down
2 changes: 0 additions & 2 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,6 @@ func TestExecutorShow(t *testing.T) {
}
utils.MustMatch(t, wantqr, qr, query)

// The FakeLegacyTablets in FakeLegacyHealthCheck don't have support for these columns/values
// So let's just be sure the statement works and we get the expected results (none)
query = "show vitess_replication_status"
qr, err = executor.Execute(ctx, "TestExecute", session, query, nil)
require.NoError(t, err)
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ type VTGate struct {
vsm *vstreamManager
txConn *TxConn
gw *TabletGateway
pv plancontext.PlannerVersion

// stats objects.
// TODO(sougou): This needs to be cleaned up. There
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

// TxThrottler throttles transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.LegacyHealthCheck to send replication-lag updates to the wrapped throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
//
// Intended Usage:
// // Assuming topoServer is a topo.Server variable pointing to a Vitess topology server.
Expand Down Expand Up @@ -348,7 +348,7 @@ func (ts *txThrottlerState) deallocateResources() {
ts.throttler = nil
}

// StatsUpdate is part of the LegacyHealthCheckStatsListener interface.
// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
// Ignore PRIMARY and RDONLY stats.
// We currently do not monitor RDONLY tablets for replication lag. RDONLY tablets are not
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (e *executor) fetchWithRetries(ctx context.Context, action func(ctx context
}
return vterrors.Wrapf(err, "interrupted while trying to run a command on tablet %v", tabletString)
case <-time.After(*executeFetchRetryTime):
// Retry 30s after the failure using the current primary seen by the LegacyHealthCheck.
// Retry 30s after the failure using the current primary seen by the HealthCheck.
}
isRetry = true
}
Expand Down
12 changes: 4 additions & 8 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (scw *SplitCloneWorker) Run(ctx context.Context) error {
// After Close returned, we can be sure that it won't call our listener
// implementation (method StatsUpdate) anymore.
if err := scw.healthCheck.Close(); err != nil {
scw.wr.Logger().Errorf2(err, "LegacyHealthCheck.Close() failed")
scw.wr.Logger().Errorf2(err, "HealthCheck.Close() failed")
}
}

Expand Down Expand Up @@ -859,7 +859,7 @@ func (scw *SplitCloneWorker) findDestinationPrimarys(ctx context.Context) error
}
primarys := scw.healthCheck.GetHealthyTabletStats(&querypb.Target{Keyspace: si.Keyspace(), Shard: si.ShardName(), TabletType: topodatapb.TabletType_PRIMARY})
if len(primarys) == 0 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot find PRIMARY tablet for destination shard for %v/%v (in cell: %v) in LegacyHealthCheck: empty LegacyTabletStats list", si.Keyspace(), si.ShardName(), scw.cell)
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot find PRIMARY tablet for destination shard for %v/%v (in cell: %v) in HealthCheck: empty TabletHealth list", si.Keyspace(), si.ShardName(), scw.cell)
}
primary := primarys[0]

Expand All @@ -869,7 +869,7 @@ func (scw *SplitCloneWorker) findDestinationPrimarys(ctx context.Context) error

scw.wr.Logger().Infof("Using tablet %v as destination primary for %v/%v", topoproto.TabletAliasString(primary.Tablet.Alias), si.Keyspace(), si.ShardName())
}
scw.wr.Logger().Infof("NOTE: The used primary of a destination shard might change over the course of the copy e.g. due to a reparent. The LegacyHealthCheck module will track and log primary changes and any error message will always refer the actually used primary address.")
scw.wr.Logger().Infof("NOTE: The used primary of a destination shard might change over the course of the copy e.g. due to a reparent. The HealthCheck module will track and log primary changes and any error message will always refer the actually used primary address.")

return nil
}
Expand Down Expand Up @@ -1357,13 +1357,9 @@ func (scw *SplitCloneWorker) createKeyResolver(td *tabletmanagerdatapb.TableDefi
return newV3ResolverFromTableDefinition(scw.keyspaceSchema, td)
}

// StatsUpdate receives replication lag updates for each destination primary
// StatsUpdate receives replication lag updates from the healthcheck for each destination primary
// and forwards them to the respective throttler instance.
// It also forwards any update to the LegacyTabletStatsCache to keep it up to date.
// It is part of the discovery.LegacyHealthCheckStatsListener interface.
func (scw *SplitCloneWorker) StatsUpdate(ts *discovery.TabletHealth) {
// scw.tsc.StatsUpdate(ts)

// Ignore unless REPLICA or RDONLY.
if ts.Target.TabletType != topodatapb.TabletType_REPLICA && ts.Target.TabletType != topodatapb.TabletType_RDONLY {
return
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/tablet_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *singleTabletProvider) description() string {
}

// shardTabletProvider returns a random healthy RDONLY tablet for a given
// keyspace and shard. It uses the LegacyHealthCheck module to retrieve the tablets.
// keyspace and shard. It uses the HealthCheck module to retrieve the tablets.
type shardTabletProvider struct {
hc *discovery.HealthCheckImpl
tracker *TabletTracker
Expand Down

0 comments on commit 4654a77

Please sign in to comment.