diff --git a/.changelog/10935.txt b/.changelog/10935.txt new file mode 100644 index 00000000000..f52b7199a6f --- /dev/null +++ b/.changelog/10935.txt @@ -0,0 +1,3 @@ +```release-note:bug +events: Fixed wildcard namespace handling +``` diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ac1c07c22fa..42b3a7e25c7 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -3378,6 +3378,7 @@ func TestFSM_ACLEvents(t *testing.T) { Topics: map[structs.Topic][]string{ tc.reqTopic: {"*"}, }, + Namespace: "default", } sub, err := broker.Subscribe(subReq) @@ -3431,6 +3432,7 @@ func TestFSM_EventBroker_JobRegisterFSMEvents(t *testing.T) { Topics: map[structs.Topic][]string{ structs.TopicJob: {"*"}, }, + Namespace: "default", } sub, err := broker.Subscribe(subReq) diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index 0a38890d63e..2238124491e 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -93,6 +93,7 @@ func EventsForIndex(t *testing.T, s *StateStore, index uint64) []structs.Event { Topics: map[structs.Topic][]string{ "*": {"*"}, }, + Namespace: "default", Index: index, StartExactlyAtIndex: true, }) diff --git a/nomad/stream/event_broker_test.go b/nomad/stream/event_broker_test.go index 0e34247106f..a136031cb37 100644 --- a/nomad/stream/event_broker_test.go +++ b/nomad/stream/event_broker_test.go @@ -270,6 +270,27 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), }, }, + { + desc: "subscribed to evals in all namespaces and removed access", + policyBeforeRules: mock.NamespacePolicy("*", "", []string{acl.NamespaceCapabilityReadJob}), + policyAfterRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), + shouldUnsubscribe: true, + event: structs.Event{ + Topic: structs.TopicEvaluation, + Type: structs.TypeEvalUpdated, + Namespace: "foo", + Payload: structs.EvaluationEvent{ + Evaluation: &structs.Evaluation{ + ID: "some-id", + }, + }, + }, + policyEvent: structs.Event{ + Topic: structs.TopicACLToken, + Type: structs.TypeACLTokenUpserted, + Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: secretID}), + }, + }, { desc: "subscribed to deployments and no access change", policyBeforeRules: mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}), @@ -467,11 +488,18 @@ func TestEventBroker_handleACLUpdates_policyupdated(t *testing.T) { publisher, err := NewEventBroker(ctx, aclDelegate, EventBrokerCfg{}) require.NoError(t, err) + var ns string + if tc.event.Namespace != "" { + ns = tc.event.Namespace + } else { + ns = structs.DefaultNamespace + } + sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{ Topics: map[structs.Topic][]string{ tc.event.Topic: {"*"}, }, - Namespace: structs.DefaultNamespace, + Namespace: ns, Token: secretID, }) diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go index 81942e9095c..a9e37342e65 100644 --- a/nomad/stream/subscription.go +++ b/nomad/stream/subscription.go @@ -123,14 +123,15 @@ func filter(req *SubscribeRequest, events []structs.Event) []structs.Event { allTopicKeys := req.Topics[structs.TopicAll] - if req.Namespace == "" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) { + // Return all events if subscribed to all namespaces and all topics + if req.Namespace == "*" && len(allTopicKeys) == 1 && allTopicKeys[0] == string(structs.TopicAll) { return events } var result []structs.Event for _, event := range events { - if req.Namespace != "" && event.Namespace != "" && event.Namespace != req.Namespace { + if req.Namespace != "*" && event.Namespace != "" && event.Namespace != req.Namespace { continue } diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go index 0fc4e3af6ee..d1d40f350e4 100644 --- a/nomad/stream/subscription_test.go +++ b/nomad/stream/subscription_test.go @@ -147,6 +147,29 @@ func TestFilter_Namespace(t *testing.T) { require.Equal(t, 2, cap(actual)) } +func TestFilter_NamespaceAll(t *testing.T) { + events := make([]structs.Event, 0, 5) + events = append(events, + structs.Event{Topic: "Test", Key: "One", Namespace: "foo"}, + structs.Event{Topic: "Test", Key: "Two", Namespace: "bar"}, + structs.Event{Topic: "Test", Key: "Three", Namespace: "default"}, + ) + + req := &SubscribeRequest{ + Topics: map[structs.Topic][]string{ + "*": {"*"}, + }, + Namespace: "*", + } + actual := filter(req, events) + expected := []structs.Event{ + {Topic: "Test", Key: "One", Namespace: "foo"}, + {Topic: "Test", Key: "Two", Namespace: "bar"}, + {Topic: "Test", Key: "Three", Namespace: "default"}, + } + require.Equal(t, expected, actual) +} + func TestFilter_FilterKeys(t *testing.T) { events := make([]structs.Event, 0, 5) events = append(events, structs.Event{Topic: "Test", Key: "One", FilterKeys: []string{"extra-key"}}, structs.Event{Topic: "Test", Key: "Two"}, structs.Event{Topic: "Test", Key: "Two"}) diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index a64ce709ebf..d86c0453364 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -43,7 +43,8 @@ by default, requiring a management token. - `namespace` `(string: "default")` - Specifies the target namespace to filter on. Specifying `*` includes all namespaces for event types that support - namespaces. + namespaces. If you specify all namespaces (`*`) you'll either need a management + token, or an ACL Policy that explicitly applies to all namespaces (`*`). - `topic` `(topic:filter_key: "*:*")` - Specifies a topic to subscribe to and filter on. The default is to subscribe to all topics. Multiple topics may be @@ -96,10 +97,15 @@ by default, requiring a management token. ### Sample Request ```shell-session -# Subscribe to all events and topics +# Subscribe to all events and topics in the default namespace $ curl -s -v -N http://127.0.0.1:4646/v1/event/stream ``` +```shell-session +# Subscribe to all events and topics in all namespaces +$ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?namespace=* +``` + ```shell-session # Start at index 100 and subscribe to all Evaluation events $ curl -s -v -N http://127.0.0.1:4646/v1/event/stream?index=100&topic=Evaluation