Skip to content

Commit

Permalink
tests: Enable progress notify in linearizability tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius authored and ahrtr committed Feb 8, 2023
1 parent 586eacc commit 39d9852
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
8 changes: 8 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type EtcdProcessClusterConfig struct {
WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
PeerProxy bool
WatchProcessNotifyInterval time.Duration
}

func DefaultConfig() *EtcdProcessClusterConfig {
Expand Down Expand Up @@ -336,6 +337,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}

func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}

func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}
Expand Down Expand Up @@ -573,6 +578,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.ExperimentalWarningUnaryRequestDuration != 0 {
args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String())
}
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.SnapshotCatchUpEntries > 0 {
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
}
Expand Down
5 changes: 4 additions & 1 deletion tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestLinearizability(t *testing.T) {
e2e.WithSnapshotCount(100),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
})
scenarios = append(scenarios, scenario{
Expand All @@ -114,6 +115,7 @@ func TestLinearizability(t *testing.T) {
e2e.WithPeerProxy(true),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints
e2e.WithWatchProcessNotifyInterval(100*time.Millisecond),
),
})
}
Expand Down Expand Up @@ -177,7 +179,8 @@ func TestLinearizability(t *testing.T) {
waitBetweenTriggers: waitBetweenFailpointTriggers,
}, *scenario.traffic)
forcestopCluster(clus)
validateWatchResponses(t, watchResponses)
watchProgressNotifyEnabled := clus.Cfg.WatchProcessNotifyInterval != 0
validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled)
longestHistory, remainingEvents := watchEventHistory(watchResponses)
validateEventsMatch(t, longestHistory, remainingEvents)
operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
Expand Down
16 changes: 12 additions & 4 deletions tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func collectClusterWatchEvents(ctx context.Context, t *testing.T, lg *zap.Logger
wg.Wait()
return memberResponses
}

func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps []watchResponse) {
var lastRevision int64 = 0
for {
Expand All @@ -64,7 +65,7 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps
return resps
default:
}
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) {
for resp := range c.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1), clientv3.WithProgressNotify()) {
resps = append(resps, watchResponse{resp, time.Now()})
lastRevision = resp.Header.Revision
if resp.Err() != nil {
Expand All @@ -74,18 +75,22 @@ func watchMember(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (resps
}
}

func validateWatchResponses(t *testing.T, responses [][]watchResponse) {
func validateWatchResponses(t *testing.T, responses [][]watchResponse, expectProgressNotify bool) {
for _, memberResponses := range responses {
validateMemberWatchResponses(t, memberResponses)
validateMemberWatchResponses(t, memberResponses, expectProgressNotify)
}
}

func validateMemberWatchResponses(t *testing.T, responses []watchResponse) {
func validateMemberWatchResponses(t *testing.T, responses []watchResponse, expectProgressNotify bool) {
var gotProgressNotify = false
var lastRevision int64 = 1
for _, resp := range responses {
if resp.Header.Revision < lastRevision {
t.Errorf("Revision should never decrease")
}
if resp.IsProgressNotify() && resp.Header.Revision == lastRevision {
gotProgressNotify = true
}
if resp.Header.Revision == lastRevision && len(resp.Events) != 0 {
t.Errorf("Got two non-empty responses about same revision")
}
Expand All @@ -100,6 +105,9 @@ func validateMemberWatchResponses(t *testing.T, responses []watchResponse) {
}
lastRevision = resp.Header.Revision
}
if gotProgressNotify != expectProgressNotify {
t.Errorf("Expected progress notify: %v, got: %v", expectProgressNotify, gotProgressNotify)
}
}

func toWatchEvents(responses []watchResponse) (events []watchEvent) {
Expand Down

0 comments on commit 39d9852

Please sign in to comment.