Skip to content

Commit

Permalink
Reset the current lag when closing the replication lag reader. (#12683)…
Browse files Browse the repository at this point in the history
… (#12743)

* Reset the current lag when closing the replication lag reader.



* Fix tests.



---------

Signed-off-by: Johan Stenberg <johanstenberg92@github.com>
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Co-authored-by: Johan Stenberg <johanstenberg92@github.com>
  • Loading branch information
frouioui and johanstenberg92 authored Mar 29, 2023
1 parent 0369347 commit b5196e8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (r *heartbeatReader) Close() {
}
r.ticks.Stop()
r.pool.Close()

currentLagNs.Set(0)

r.isOpen = false
log.Info("Heartbeat Reader: closed")
}
Expand Down
51 changes: 46 additions & 5 deletions go/vt/vttablet/tabletserver/repltracker/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ import (
func TestReaderReadHeartbeat(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
tr := newReader(db, mockNowFunc)

now := time.Now()
tr := newReader(db, &now)
defer tr.Close()

tr.pool.Open(tr.env.Config().DB.AppWithDB(), tr.env.Config().DB.DbaWithDB(), tr.env.Config().DB.AppDebugWithDB())

db.AddQuery(fmt.Sprintf("SELECT ts FROM %s.heartbeat WHERE keyspaceShard='%s'", "_vt", tr.keyspaceShard), &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "ts", Type: sqltypes.Int64},
Expand Down Expand Up @@ -79,14 +83,46 @@ func TestReaderReadHeartbeat(t *testing.T) {
utils.MustMatch(t, expectedHisto, heartbeatLagNsHistogram.Counts(), "wrong counts in histogram")
}

// TestReaderCloseSetsCurrentLagToZero tests that when closing the heartbeat reader, the current lag is
// set to zero.
func TestReaderCloseSetsCurrentLagToZero(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
tr := newReader(db, nil)

db.AddQuery(fmt.Sprintf("SELECT ts FROM %s.heartbeat WHERE keyspaceShard='%s'", "_vt", tr.keyspaceShard), &sqltypes.Result{
Fields: []*querypb.Field{
{Name: "ts", Type: sqltypes.Int64},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(time.Now().Add(-10 * time.Second).UnixNano()),
}},
})

currentLagNs.Reset()

tr.Open()
time.Sleep(2 * time.Second)

assert.Greater(t, currentLagNs.Get(), int64(0), "lag should be greater than zero")

tr.Close()

assert.Equal(t, int64(0), currentLagNs.Get(), "lag should be be zero after closing the reader.")
}

// TestReaderReadHeartbeatError tests that we properly account for errors
// encountered in the reading of heartbeat.
func TestReaderReadHeartbeatError(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
tr := newReader(db, mockNowFunc)

now := time.Now()
tr := newReader(db, &now)
defer tr.Close()

tr.pool.Open(tr.env.Config().DB.AppWithDB(), tr.env.Config().DB.DbaWithDB(), tr.env.Config().DB.AppDebugWithDB())

cumulativeLagNs.Reset()
readErrors.Reset()

Expand All @@ -100,18 +136,23 @@ func TestReaderReadHeartbeatError(t *testing.T) {
assert.Equal(t, int64(1), readErrors.Get(), "wrong read error count")
}

func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatReader {
func newReader(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatReader {
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
config.ReplicationTracker.HeartbeatIntervalSeconds = 1
params, _ := db.ConnParams().MysqlParams()
cp := *params
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
config.DB = dbc

tr := newHeartbeatReader(tabletenv.NewEnv(config, "ReaderTest"))
tr.keyspaceShard = "test:0"
tr.now = nowFunc
tr.pool.Open(dbc.AppWithDB(), dbc.DbaWithDB(), dbc.AppDebugWithDB())

if frozenTime != nil {
tr.now = func() time.Time {
return *frozenTime
}
}

return tr
}
26 changes: 14 additions & 12 deletions go/vt/vttablet/tabletserver/repltracker/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,11 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

var (
now = time.Now()
mockNowFunc = func() time.Time {
return now
}
)

func TestCreateSchema(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
tw := newTestWriter(db, mockNowFunc)
now := time.Now()
tw := newTestWriter(db, &now)
defer tw.Close()
writes.Reset()

Expand All @@ -66,7 +60,8 @@ func TestWriteHeartbeat(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()

tw := newTestWriter(db, mockNowFunc)
now := time.Now()
tw := newTestWriter(db, &now)
upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)",
"_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard)
db.AddQuery(upsert, &sqltypes.Result{})
Expand All @@ -83,7 +78,8 @@ func TestWriteHeartbeatError(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()

tw := newTestWriter(db, mockNowFunc)
now := time.Now()
tw := newTestWriter(db, &now)

writes.Reset()
writeErrors.Reset()
Expand All @@ -93,7 +89,7 @@ func TestWriteHeartbeatError(t *testing.T) {
assert.Equal(t, int64(1), writeErrors.Get())
}

func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatWriter {
func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter {
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
config.ReplicationTracker.HeartbeatIntervalSeconds = 1
Expand All @@ -104,7 +100,13 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatWriter

tw := newHeartbeatWriter(tabletenv.NewEnv(config, "WriterTest"), &topodatapb.TabletAlias{Cell: "test", Uid: 1111})
tw.keyspaceShard = "test:0"
tw.now = nowFunc

if frozenTime != nil {
tw.now = func() time.Time {
return *frozenTime
}
}

tw.appPool.Open(dbc.AppWithDB())
tw.allPrivsPool.Open(dbc.AllPrivsWithDB())

Expand Down

0 comments on commit b5196e8

Please sign in to comment.