Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use our heartbeat echo RPCs to estimate clock skew, expose it in status APIs #24343

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions api/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ const (
)

type ClusterInfo struct {
APIAddr string `json:"api_address,omitempty" mapstructure:"api_address"`
ClusterAddress string `json:"cluster_address,omitempty" mapstructure:"cluster_address"`
ConnectionStatus string `json:"connection_status,omitempty" mapstructure:"connection_status"`
LastHeartBeat string `json:"last_heartbeat,omitempty" mapstructure:"last_heartbeat"`
NodeID string `json:"node_id,omitempty" mapstructure:"node_id"`
APIAddr string `json:"api_address,omitempty" mapstructure:"api_address"`
ClusterAddress string `json:"cluster_address,omitempty" mapstructure:"cluster_address"`
ConnectionStatus string `json:"connection_status,omitempty" mapstructure:"connection_status"`
LastHeartBeat string `json:"last_heartbeat,omitempty" mapstructure:"last_heartbeat"`
LastHeartBeatDurationMillis string `json:"last_heartbeat_duration_ms,omitempty" mapstructure:"last_heartbeat_duration_ms"`
ClockSkewMillis string `json:"clock_skew_ms,omitempty" mapstructure:"clock_skew_ms"`
NodeID string `json:"node_id,omitempty" mapstructure:"node_id"`
}

type ReplicationStatusGenericResponse struct {
Expand Down
18 changes: 10 additions & 8 deletions api/sys_hastatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ type HAStatusResponse struct {
}

type HANode struct {
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
ActiveNode bool `json:"active_node"`
LastEcho *time.Time `json:"last_echo"`
Version string `json:"version"`
UpgradeVersion string `json:"upgrade_version,omitempty"`
RedundancyZone string `json:"redundancy_zone,omitempty"`
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
ActiveNode bool `json:"active_node"`
LastEcho *time.Time `json:"last_echo"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
Version string `json:"version"`
UpgradeVersion string `json:"upgrade_version,omitempty"`
RedundancyZone string `json:"redundancy_zone,omitempty"`
}
2 changes: 2 additions & 0 deletions api/sys_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ type HealthResponse struct {
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
Enterprise bool `json:"enterprise"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
}
5 changes: 5 additions & 0 deletions changelog/24343.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
api: sys/health and sys/ha-status now expose information about how long
the last heartbeat took, and the estimated clock skew between standby and
active node based on that heartbeat duration.
```
4 changes: 4 additions & 0 deletions http/sys_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
Enterprise: constants.IsEnterprise,
ClusterName: clusterName,
ClusterID: clusterID,
ClockSkewMillis: core.ActiveNodeClockSkewMillis(),
EchoDurationMillis: core.EchoDuration().Milliseconds(),
}

licenseState, err := core.EntGetLicenseState()
Expand Down Expand Up @@ -252,4 +254,6 @@ type HealthResponse struct {
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
License *HealthResponseLicense `json:"license,omitempty"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
}
2 changes: 1 addition & 1 deletion scripts/protocversioncheck.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ PROTOC_CMD=${PROTOC_CMD:-protoc}
PROTOC_VERSION_EXACT="$1"
echo "==> Checking that protoc is at version $1..."

PROTOC_VERSION=$($PROTOC_CMD --version | grep -o '[0-9]\+\.[0-9]\+\.[0-9]\+')
PROTOC_VERSION=$($PROTOC_CMD --version | grep -o '[0-9]\+\.[0-9]\+\(\.[0-9]\+\)\?')

if [ "$PROTOC_VERSION" == "$PROTOC_VERSION_EXACT" ]; then
echo "Using protoc version $PROTOC_VERSION"
Expand Down
62 changes: 62 additions & 0 deletions sdk/helper/testcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,68 @@ func WaitForActiveNode(ctx context.Context, cluster VaultCluster) (int, error) {
return -1, ctx.Err()
}

func WaitForStandbyNode(ctx context.Context, cluster VaultCluster, nodeIdx int) error {
if nodeIdx >= len(cluster.Nodes()) {
return fmt.Errorf("invalid nodeIdx %d for cluster", nodeIdx)
}
node := cluster.Nodes()[nodeIdx]
client := node.APIClient()

var err error
for ctx.Err() == nil {
var resp *api.LeaderResponse

resp, err = client.Sys().LeaderWithContext(ctx)
switch {
case err != nil:
case resp.IsSelf:
return fmt.Errorf("waiting for standby but node is leader")
case resp.LeaderAddress == "":
err = fmt.Errorf("node doesn't know leader address")
default:
return nil
}

time.Sleep(100 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}

func WaitForActiveNodeAndStandbys(ctx context.Context, cluster VaultCluster) (int, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

leaderIdx, err := WaitForActiveNode(ctx, cluster)
if err != nil {
return 0, err
}

if len(cluster.Nodes()) == 1 {
return 0, nil
}

errs := make(chan error)
for i := range cluster.Nodes() {
if i == leaderIdx {
continue
}
go func(i int) {
errs <- WaitForStandbyNode(ctx, cluster, i)
}(i)
}

var merr *multierror.Error
expectedStandbys := len(cluster.Nodes()) - 1
for i := 0; i < expectedStandbys; i++ {
merr = multierror.Append(merr, <-errs)
}

return leaderIdx, merr.ErrorOrNil()
}

func WaitForActiveNodeAndPerfStandbys(ctx context.Context, cluster VaultCluster) error {
logger := cluster.NamedLogger("WaitForActiveNodeAndPerfStandbys")
// This WaitForActiveNode was added because after a Raft cluster is sealed
Expand Down
19 changes: 11 additions & 8 deletions vault/cluster/simulations.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"net"
"time"

uberAtomic "go.uber.org/atomic"
)

type delayedConn struct {
Expand All @@ -15,12 +17,13 @@ type delayedConn struct {
}

func newDelayedConn(conn net.Conn, delay time.Duration) net.Conn {
dr := &delayedReader{
r: conn,
delay: uberAtomic.NewDuration(delay),
}
return &delayedConn{
dr: dr,
Conn: conn,
dr: &delayedReader{
r: conn,
delay: delay,
},
}
}

Expand All @@ -29,18 +32,18 @@ func (conn *delayedConn) Read(data []byte) (int, error) {
}

func (conn *delayedConn) SetDelay(delay time.Duration) {
conn.dr.delay = delay
conn.dr.delay.Store(delay)
}

type delayedReader struct {
r io.Reader
delay time.Duration
delay *uberAtomic.Duration
}

func (dr *delayedReader) Read(data []byte) (int, error) {
// Sleep for the delay period prior to reading
if dr.delay > 0 {
time.Sleep(dr.delay)
if delay := dr.delay.Load(); delay != 0 {
time.Sleep(delay)
}

return dr.r.Read(data)
Expand Down
45 changes: 31 additions & 14 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,17 @@ type Core struct {
WellKnownRedirects *wellKnownRedirectRegistry // RFC 5785
// Config value for "detect_deadlocks".
detectDeadlocks []string

echoDuration *uberAtomic.Duration
activeNodeClockSkewMillis *uberAtomic.Int64
}

func (c *Core) ActiveNodeClockSkewMillis() int64 {
return c.activeNodeClockSkewMillis.Load()
}

func (c *Core) EchoDuration() time.Duration {
return c.echoDuration.Load()
}

// c.stateLock needs to be held in read mode before calling this function.
Expand Down Expand Up @@ -1045,6 +1056,8 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
impreciseLeaseRoleTracking: conf.ImpreciseLeaseRoleTracking,
WellKnownRedirects: NewWellKnownRedirects(),
detectDeadlocks: detectDeadlocks,
echoDuration: uberAtomic.NewDuration(0),
activeNodeClockSkewMillis: uberAtomic.NewInt64(0),
}

c.standbyStopCh.Store(make(chan struct{}))
Expand Down Expand Up @@ -3919,13 +3932,15 @@ func (c *Core) ReloadIntrospectionEndpointEnabled() {
}

type PeerNode struct {
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
Version string `json:"version"`
LastEcho time.Time `json:"last_echo"`
UpgradeVersion string `json:"upgrade_version,omitempty"`
RedundancyZone string `json:"redundancy_zone,omitempty"`
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
Version string `json:"version"`
LastEcho time.Time `json:"last_echo"`
UpgradeVersion string `json:"upgrade_version,omitempty"`
RedundancyZone string `json:"redundancy_zone,omitempty"`
EchoDuration time.Duration `json:"echo_duration"`
ClockSkewMillis int64 `json:"clock_skew_millis"`
}

// GetHAPeerNodesCached returns the nodes that've sent us Echo requests recently.
Expand All @@ -3934,13 +3949,15 @@ func (c *Core) GetHAPeerNodesCached() []PeerNode {
for itemClusterAddr, item := range c.clusterPeerClusterAddrsCache.Items() {
info := item.Object.(nodeHAConnectionInfo)
nodes = append(nodes, PeerNode{
Hostname: info.nodeInfo.Hostname,
APIAddress: info.nodeInfo.ApiAddr,
ClusterAddress: itemClusterAddr,
LastEcho: info.lastHeartbeat,
Version: info.version,
UpgradeVersion: info.upgradeVersion,
RedundancyZone: info.redundancyZone,
Hostname: info.nodeInfo.Hostname,
APIAddress: info.nodeInfo.ApiAddr,
ClusterAddress: itemClusterAddr,
LastEcho: info.lastHeartbeat,
Version: info.version,
UpgradeVersion: info.upgradeVersion,
RedundancyZone: info.redundancyZone,
EchoDuration: info.echoDuration,
ClockSkewMillis: info.clockSkewMillis,
})
}
return nodes
Expand Down
114 changes: 114 additions & 0 deletions vault/external_tests/standby/standby_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package standby

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/helper/testhelpers/corehelpers"
"github.com/hashicorp/vault/helper/testhelpers/teststorage"
"github.com/hashicorp/vault/sdk/helper/testcluster"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/cluster"
)

// Test_Echo_Duration_Skew tests that the sys/health and sys/ha-status endpoints
// report reasonable values for echo duration and clock skew.
func Test_Echo_Duration_Skew(t *testing.T) {
t.Parallel()
cases := []struct {
name string
perfstandby bool
}{
{"standby", false},
{"perfstandby", true},
}
for i := range cases {
perfstandby := cases[i].perfstandby
if perfstandby && !constants.IsEnterprise {
continue
}
t.Run(cases[i].name, func(t *testing.T) {
t.Parallel()
conf, opts := teststorage.ClusterSetup(nil, nil, nil)
name := strings.Replace(t.Name(), "/", "_", -1)
logger := corehelpers.NewTestLogger(t)
layers, err := cluster.NewInmemLayerCluster(name, 3, logger)
if err != nil {
t.Fatal(err)
}
opts.ClusterLayers = layers
opts.Logger = logger
conf.DisablePerformanceStandby = !perfstandby
cluster := vault.NewTestCluster(t, conf, opts)
defer cluster.Cleanup()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
leaderIdx, err := testcluster.WaitForActiveNodeAndStandbys(ctx, cluster)
if err != nil {
t.Fatal(err)
}
leader := cluster.Nodes()[leaderIdx]

// The delay applies in both directions, hence a 0.25s delay implies a 0.5s roundtrip delay
layers.SetReaderDelay(time.Second / 4)

check := func(echoDuration int64, clockSkew int64) error {
if echoDuration < time.Second.Milliseconds()/2 {
return fmt.Errorf("echo duration must exceed 0.5s, got: %dms", echoDuration)
}
// Because we're using the same clock for all nodes, any clock skew will
// be negative, as it's based on the delta of server time across both nodes,
// but it doesn't factor in the round-trip time of the echo request.
if clockSkew == 0 || -clockSkew < time.Second.Milliseconds()/2 {
return fmt.Errorf("clock skew must be nonzero and exceed -0.5s, got: %dms", clockSkew)
}

return nil
}

// We need to wait for at least 2 heartbeats to happen (2s intervals)
corehelpers.RetryUntil(t, 5*time.Second, func() error {
haStatus, err := leader.APIClient().Sys().HAStatus()
if err != nil {
t.Fatal(err)
}
if len(haStatus.Nodes) < 3 {
return fmt.Errorf("expected 3 nodes, got %d", len(haStatus.Nodes))
}
for _, node := range haStatus.Nodes {
if node.ActiveNode {
continue
}

if err := check(node.EchoDurationMillis, node.ClockSkewMillis); err != nil {
return fmt.Errorf("ha-status node %s: %w", node.Hostname, err)
}
}

for i, node := range cluster.Nodes() {
if i == leaderIdx {
continue
}

h, err := node.APIClient().Sys().Health()
if err != nil {
t.Fatal(err)
}

if err := check(h.EchoDurationMillis, h.ClockSkewMillis); err != nil {
return fmt.Errorf("health node %s: %w", node.APIClient().Address(), err)
}
}
return nil
})
})
}
}
Loading