Skip to content

Commit

Permalink
streaming: split event buffer by key (#12080)
Browse files Browse the repository at this point in the history
  • Loading branch information
boxofrad authored Jan 28, 2022
1 parent c1cb58b commit fdfe079
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 373 deletions.
3 changes: 3 additions & 0 deletions .changelog/12080.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
streaming: Improved performance when the server is handling many concurrent subscriptions and has a high number of CPU cores
```
34 changes: 14 additions & 20 deletions agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,26 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo
return e.Value.CanRead(authz) == acl.Allow
}

func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace, partition string) bool {
if key == "" && namespace == "" && partition == "" {
return true
func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
partition := e.Value.Service.PartitionOrDefault()
if e.overridePartition != "" {
partition = e.overridePartition
}
partition = strings.ToLower(partition)

if e.Value.Service == nil {
return false
namespace := e.Value.Service.NamespaceOrDefault()
if e.overrideNamespace != "" {
namespace = e.overrideNamespace
}
namespace = strings.ToLower(namespace)

name := e.Value.Service.Service
key := e.Value.Service.Service
if e.overrideKey != "" {
name = e.overrideKey
}
ns := e.Value.Service.EnterpriseMeta.NamespaceOrDefault()
if e.overrideNamespace != "" {
ns = e.overrideNamespace
}
ap := e.Value.Service.EnterpriseMeta.PartitionOrDefault()
if e.overridePartition != "" {
ap = e.overridePartition
key = e.overrideKey
}
key = strings.ToLower(key)

return (key == "" || strings.EqualFold(key, name)) &&
(namespace == "" || strings.EqualFold(namespace, ns)) &&
(partition == "" || strings.EqualFold(partition, ap))
return stream.Subject(partition + "/" + namespace + "/" + key)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand All @@ -67,8 +62,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
defer tx.Abort()

connect := topic == topicServiceHealthConnect
entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta)
if err != nil {
return 0, err
}
Expand Down
200 changes: 97 additions & 103 deletions agent/consul/state/catalog_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,106 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)

func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
// Matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"default partition and namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "foo",
EnterpriseMeta: structs.EnterpriseMeta{},
},
},
"mixed casing": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "FoO",
},
},
},
stream.SubscribeRequest{Key: "foo"},
},
"override key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideKey: "bar",
},
stream.SubscribeRequest{Key: "bar"},
},
} {
t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.req.Subject(), tc.evt.Subject())
})
}

// Non-matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"different key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "bar",
},
},
"different partition": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overridePartition: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
"different namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideNamespace: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
} {
t.Run(desc, func(t *testing.T) {
require.NotEqual(t, tc.req.Subject(), tc.evt.Subject())
})
}
}

func TestServiceHealthSnapshot(t *testing.T) {
store := NewStateStore(nil)

Expand Down Expand Up @@ -1771,7 +1866,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
// all events for a particular topic are grouped together. The sort is
// stable so events with the same key retain their relative order.
//
// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey
// This sort should match the logic in EventPayloadCheckServiceNode.Subject
// to avoid masking bugs.
var cmpPartialOrderEvents = cmp.Options{
cmpopts.SortSlices(func(i, j stream.Event) bool {
Expand Down Expand Up @@ -2418,107 +2513,6 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
}
}

func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
type testCase struct {
name string
payload EventPayloadCheckServiceNode
key string
namespace string
partition string // TODO(partitions): create test cases for this being set
expected bool
}

fn := func(t *testing.T, tc testCase) {
if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" {
t.Skip("cant test namespace matching without namespace support")
}

require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace, tc.partition))
}

var testCases = []testCase{
{
name: "no key or namespace",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
expected: true,
},
{
name: "no key, with namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
namespace: "ns1",
expected: true,
},
{
name: "no namespace, with key match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
expected: true,
},
{
name: "key match, namespace mismatch",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
namespace: "ns2",
expected: false,
},
{
name: "key mismatch, namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv2",
namespace: "ns1",
expected: false,
},
{
name: "override key match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""),
key: "srv1",
namespace: "ns1",
expected: true,
},
{
name: "override key mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"),
key: "proxy",
namespace: "ns2",
expected: true,
},
{
name: "override namespace mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override both key and namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"),
key: "srv1",
namespace: "ns2",
expected: true,
},
{
name: "override both key and namespace mismatch namespace",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}

func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
return EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Expand Down
20 changes: 4 additions & 16 deletions agent/consul/state/store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,26 +419,14 @@ type nodePayload struct {
node *structs.ServiceNode
}

func (p nodePayload) MatchesKey(key, _, partition string) bool {
if key == "" && partition == "" {
return true
}

if p.node == nil {
return false
}

if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() {
return false
}

return p.key == key
}

func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
return true
}

func (p nodePayload) Subject() stream.Subject {
return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key)
}

func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
Expand Down
Loading

0 comments on commit fdfe079

Please sign in to comment.