Skip to content

Commit

Permalink
ensure the traffic split is resetting between retries; make it less c…
Browse files Browse the repository at this point in the history
…onfigurable
  • Loading branch information
rboyer committed Nov 8, 2023
1 parent 0f8736f commit 588545d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
8 changes: 4 additions & 4 deletions test-integ/catalogv2/explicit_destinations_l7_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ func TestSplitterFeaturesL7ExplicitDestinations(t *testing.T) {
switch u.PortName {
case "tcp":
asserter.CheckBlankspaceNameTrafficSplitViaTCP(t, svc, u,
map[string]int{v1Expect: 10, v2Expect: 90}, 2)
map[string]int{v1Expect: 10, v2Expect: 90})
case "grpc":
asserter.CheckBlankspaceNameTrafficSplitViaGRPC(t, svc, u,
map[string]int{v1Expect: 10, v2Expect: 90}, 2)
map[string]int{v1Expect: 10, v2Expect: 90})
case "http":
asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, false, "/",
map[string]int{v1Expect: 10, v2Expect: 90}, 2)
map[string]int{v1Expect: 10, v2Expect: 90})
case "http2":
asserter.CheckBlankspaceNameTrafficSplitViaHTTP(t, svc, u, true, "/",
map[string]int{v1Expect: 10, v2Expect: 90}, 2)
map[string]int{v1Expect: 10, v2Expect: 90})
default:
t.Fatalf("unexpected port name: %s", u.PortName)
}
Expand Down
70 changes: 49 additions & 21 deletions test-integ/topoutil/asserter_blankspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (a *Asserter) CheckBlankspaceNameViaHTTP(
) {
t.Helper()

a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 1, func(r *retry.R, remoteName string) {
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
Expand All @@ -41,15 +41,16 @@ func (a *Asserter) CheckBlankspaceNameTrafficSplitViaHTTP(
useHTTP2 bool,
path string,
expect map[string]int,
epsilon int,
) {
t.Helper()

got := make(map[string]int)
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 100, func(_ *retry.R, name string) {
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplit(r, got, expect, epsilon)
assertTrafficSplitFor100Requests(r, got, expect)
})
}

Expand All @@ -60,6 +61,7 @@ func (a *Asserter) checkBlankspaceNameViaHTTPWithCallback(
useHTTP2 bool,
path string,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
Expand Down Expand Up @@ -96,7 +98,7 @@ func (a *Asserter) checkBlankspaceNameViaHTTPWithCallback(
actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)
}

multiassert(t, count, func(r *retry.R) {
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaHTTP(context.Background(), client, addr, actualURL)
require.NoError(r, err)
attemptFn(r, name)
Expand All @@ -116,7 +118,7 @@ func (a *Asserter) CheckBlankspaceNameViaTCP(
) {
t.Helper()

a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 1, func(r *retry.R, remoteName string) {
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
Expand All @@ -128,15 +130,16 @@ func (a *Asserter) CheckBlankspaceNameTrafficSplitViaTCP(
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
epsilon int,
) {
t.Helper()

got := make(map[string]int)
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 100, func(_ *retry.R, name string) {
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplit(r, got, expect, epsilon)
assertTrafficSplitFor100Requests(r, got, expect)
})
}

Expand All @@ -145,6 +148,7 @@ func (a *Asserter) checkBlankspaceNameViaTCPWithCallback(
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
Expand All @@ -162,7 +166,7 @@ func (a *Asserter) checkBlankspaceNameViaTCPWithCallback(

addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)

multiassert(t, count, func(r *retry.R) {
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaTCP(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
Expand All @@ -182,9 +186,9 @@ func (a *Asserter) CheckBlankspaceNameViaGRPC(
) {
t.Helper()

a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 1, func(r *retry.R, remoteName string) {
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}, func(_ *retry.R) {})
}

// CheckBlankspaceNameTrafficSplitViaGRPC is like CheckBlankspaceNameViaGRPC
Expand All @@ -194,15 +198,16 @@ func (a *Asserter) CheckBlankspaceNameTrafficSplitViaGRPC(
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
epsilon int,
) {
t.Helper()

got := make(map[string]int)
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 100, func(_ *retry.R, name string) {
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplit(r, got, expect, epsilon)
assertTrafficSplitFor100Requests(r, got, expect)
})
}

Expand All @@ -211,6 +216,7 @@ func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback(
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
Expand All @@ -228,7 +234,7 @@ func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback(

addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)

multiassert(t, count, func(r *retry.R) {
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaGRPC(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
Expand All @@ -237,23 +243,44 @@ func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback(
})
}

// assertTrafficSplitFor100Requests compares the counts of 100 requests that
// did reach an observed set of destinations (nameCounts) against the expected
// counts of those same services is the same within a fixed difference of 2.
func assertTrafficSplitFor100Requests(t require.TestingT, nameCounts map[string]int, expect map[string]int) {
const (
numRequests = 100
allowedDelta = 2
)
require.Equal(t, numRequests, sumMapValues(nameCounts), "measured traffic was not %d requests", numRequests)
require.Equal(t, numRequests, sumMapValues(expect), "expected traffic was not %d requests", numRequests)
assertTrafficSplit(t, nameCounts, expect, allowedDelta)
}

func sumMapValues(m map[string]int) int {
sum := 0
for _, v := range m {
sum += v
}
return sum
}

// assertTrafficSplit compares the counts of requests that did reach an
// observed set of destinations (nameCounts) against the expected counts of
// those same services is the same within the provided epsilon value.
// those same services is the same within the provided allowedDelta value.
//
// When doing random traffic splits it'll never be perfect so we need the
// wiggle room to avoid having a flaky test.
func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect map[string]int, epsilon int) {
func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect map[string]int, allowedDelta int) {
require.Len(t, nameCounts, len(expect))
for name, expectCount := range expect {
gotCount, ok := nameCounts[name]
require.True(t, ok)
if len(expect) == 1 {
require.Equal(t, expectCount, gotCount)
} else {
require.InEpsilon(t, expectCount, gotCount, float64(epsilon),
require.InDelta(t, expectCount, gotCount, float64(allowedDelta),
"expected %q side of split to have %d requests not %d (e=%d)",
name, expectCount, gotCount, epsilon,
name, expectCount, gotCount, allowedDelta,
)
}
}
Expand All @@ -265,8 +292,9 @@ func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect ma
// It's primary use at the time it was written was to execute a set of requests
// repeatedly to witness where the requests went, and then at the end doing a
// verification of traffic splits (a bit like MAP/REDUCE).
func multiassert(t *testing.T, count int, attemptFn, checkFn func(r *retry.R)) {
func multiassert(t *testing.T, count int, resetFn, attemptFn, checkFn func(r *retry.R)) {
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
resetFn(r)
for i := 0; i < count; i++ {
attemptFn(r)
}
Expand Down

0 comments on commit 588545d

Please sign in to comment.