Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming: split event buffer by key #12080

Merged
merged 17 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12080.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
streaming: Improved performance when the server is handling many concurrent subscriptions and has a high number of CPU cores
```
34 changes: 14 additions & 20 deletions agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,26 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo
return e.Value.CanRead(authz) == acl.Allow
}

func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace, partition string) bool {
if key == "" && namespace == "" && partition == "" {
return true
func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
partition := e.Value.Service.PartitionOrDefault()
if e.overridePartition != "" {
partition = e.overridePartition
}
partition = strings.ToLower(partition)

if e.Value.Service == nil {
return false
namespace := e.Value.Service.NamespaceOrDefault()
if e.overrideNamespace != "" {
namespace = e.overrideNamespace
}
namespace = strings.ToLower(namespace)

name := e.Value.Service.Service
key := e.Value.Service.Service
if e.overrideKey != "" {
name = e.overrideKey
}
ns := e.Value.Service.EnterpriseMeta.NamespaceOrDefault()
if e.overrideNamespace != "" {
ns = e.overrideNamespace
}
ap := e.Value.Service.EnterpriseMeta.PartitionOrDefault()
if e.overridePartition != "" {
ap = e.overridePartition
key = e.overrideKey
}
key = strings.ToLower(key)

return (key == "" || strings.EqualFold(key, name)) &&
(namespace == "" || strings.EqualFold(namespace, ns)) &&
(partition == "" || strings.EqualFold(partition, ap))
return stream.Subject(partition + "/" + namespace + "/" + key)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand All @@ -67,8 +62,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
defer tx.Abort()

connect := topic == topicServiceHealthConnect
entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta)
if err != nil {
return 0, err
}
Expand Down
200 changes: 97 additions & 103 deletions agent/consul/state/catalog_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,106 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)

func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
boxofrad marked this conversation as resolved.
Show resolved Hide resolved
// Matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"default partition and namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "foo",
EnterpriseMeta: structs.EnterpriseMeta{},
},
},
"mixed casing": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "FoO",
},
},
},
stream.SubscribeRequest{Key: "foo"},
},
"override key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideKey: "bar",
},
stream.SubscribeRequest{Key: "bar"},
},
} {
t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.req.Subject(), tc.evt.Subject())
})
}

// Non-matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"different key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "bar",
},
},
"different partition": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overridePartition: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
"different namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideNamespace: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
} {
t.Run(desc, func(t *testing.T) {
require.NotEqual(t, tc.req.Subject(), tc.evt.Subject())
})
}
}

func TestServiceHealthSnapshot(t *testing.T) {
store := NewStateStore(nil)

Expand Down Expand Up @@ -1673,7 +1768,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
// all events for a particular topic are grouped together. The sort is
// stable so events with the same key retain their relative order.
//
// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey
// This sort should match the logic in EventPayloadCheckServiceNode.Subject
// to avoid masking bugs.
var cmpPartialOrderEvents = cmp.Options{
cmpopts.SortSlices(func(i, j stream.Event) bool {
Expand Down Expand Up @@ -2315,107 +2410,6 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
}
}

func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
type testCase struct {
name string
payload EventPayloadCheckServiceNode
key string
namespace string
partition string // TODO(partitions): create test cases for this being set
expected bool
}

fn := func(t *testing.T, tc testCase) {
if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" {
t.Skip("cant test namespace matching without namespace support")
}

require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace, tc.partition))
}

var testCases = []testCase{
{
name: "no key or namespace",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
expected: true,
},
{
name: "no key, with namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
namespace: "ns1",
expected: true,
},
{
name: "no namespace, with key match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
expected: true,
},
{
name: "key match, namespace mismatch",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
namespace: "ns2",
expected: false,
},
{
name: "key mismatch, namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv2",
namespace: "ns1",
expected: false,
},
{
name: "override key match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""),
key: "srv1",
namespace: "ns1",
expected: true,
},
{
name: "override key mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"),
key: "proxy",
namespace: "ns2",
expected: true,
},
{
name: "override namespace mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override both key and namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"),
key: "srv1",
namespace: "ns2",
expected: true,
},
{
name: "override both key and namespace mismatch namespace",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}

func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
return EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Expand Down
20 changes: 4 additions & 16 deletions agent/consul/state/store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,26 +422,14 @@ type nodePayload struct {
node *structs.ServiceNode
}

func (p nodePayload) MatchesKey(key, _, partition string) bool {
if key == "" && partition == "" {
return true
}

if p.node == nil {
return false
}

if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() {
return false
}

return p.key == key
}

func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
return true
}

func (p nodePayload) Subject() stream.Subject {
return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key)
}

func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",
Expand Down
Loading