Skip to content

Commit

Permalink
liveness: introduce GetLivenessesFromKV
Browse files Browse the repository at this point in the history
Now that we always create a liveness record on start up (cockroachdb#53805), we can
simply fetch all records from KV when wanting an up-to-date view of all
nodes that have ever been a part of the cluster. We add a helper to do
as much, which we'll rely on when introducing long running migrations
(cockroachdb#56107).

It's a bit unfortunate that we're further adding on to the liveness API
without changing the underlying look-aside cache structure, but the
up-to-date records from KV directly is the world we're hoping to start
moving towards over time. The TODO added in [1] outlines what the future
holds.

[1]: cockroachdb@d631239

Release note: None
  • Loading branch information
irfansharif committed Nov 3, 2020
1 parent ecf9e9c commit ce4f607
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/liveness/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package liveness_test
import (
"context"
"fmt"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -83,6 +84,60 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) {
}
}

// TestGetLivenessesFromKV verifies that fetching liveness records from KV
// directly retrieves all the records we expect.
func TestGetLivenessesFromKV(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// At this point StartTestCluster has waited for all nodes to become live.

// Verify that each servers sees the same set of liveness records in KV.
for i := 0; i < tc.NumServers(); i++ {
nodeID := tc.Server(i).NodeID()
nl := tc.Server(i).NodeLiveness().(*liveness.NodeLiveness)

if live, err := nl.IsLive(nodeID); err != nil {
t.Fatal(err)
} else if !live {
t.Fatalf("node %d not live", nodeID)
}

livenesses, err := nl.GetLivenessesFromKV(ctx)
assert.Nil(t, err)
assert.Equal(t, len(livenesses), tc.NumServers())

var nodeIDs []roachpb.NodeID
for _, liveness := range livenesses {
nodeIDs = append(nodeIDs, liveness.NodeID)

// We expect epoch=1 as nodes first create a liveness record at epoch=0,
// and then increment it during their first heartbeat.
if liveness.Epoch != 1 {
t.Fatalf("expected epoch=1, got epoch=%d", liveness.Epoch)
}
if !liveness.Membership.Active() {
t.Fatalf("expected membership=active, got membership=%s", liveness.Membership)
}
}

sort.Slice(nodeIDs, func(i, j int) bool {
return nodeIDs[i] < nodeIDs[j]
})
for i := range nodeIDs {
expNodeID := roachpb.NodeID(i + 1) // Node IDs are 1-indexed.
if nodeIDs[i] != expNodeID {
t.Fatalf("expected nodeID=%d, got %d", expNodeID, nodeIDs[i])
}
}
}

}

func TestNodeLivenessStatusMap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,40 @@ func (nl *NodeLiveness) GetLivenesses() []livenesspb.Liveness {
return livenesses
}

// GetLivenessesFromKV returns a slice containing the liveness record of all
// nodes that have ever been a part of the cluster. The records are read from
// the KV layer in a KV transaction. This is in contrast to GetLivenesses above,
// which consults a (possibly stale) in-memory cache.
func (nl *NodeLiveness) GetLivenessesFromKV(ctx context.Context) ([]livenesspb.Liveness, error) {
kvs, err := nl.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0)
if err != nil {
return nil, errors.Wrap(err, "unable to get liveness")
}

var results []livenesspb.Liveness
for _, kv := range kvs {
if kv.Value == nil {
return nil, errors.AssertionFailedf("missing liveness record")
}
var liveness livenesspb.Liveness
if err := kv.Value.GetProto(&liveness); err != nil {
return nil, errors.Wrap(err, "invalid liveness record")
}

livenessRec := Record{
Liveness: liveness,
raw: kv.Value.TagAndDataBytes(),
}

// Update our cache with the liveness record we just found.
nl.maybeUpdate(ctx, livenessRec)

results = append(results, liveness)
}

return results, nil
}

// GetLiveness returns the liveness record for the specified nodeID. If the
// liveness record is not found (due to gossip propagation delays or due to the
// node not existing), we surface that to the caller. The record returned also
Expand Down

0 comments on commit ce4f607

Please sign in to comment.