Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: Ignore send context #23500

Merged
merged 3 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/23500.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
events: Ignore sending context to give more time for events to send
```
7 changes: 5 additions & 2 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
// the namespace and plugin info automatically.
func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived,
// such as with an HTTP request context.
func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
if ns == nil {
return namespace.ErrNoNamespace
}
Expand All @@ -130,7 +132,7 @@ func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namesp

// We can't easily know when the SendEvent is complete, so we can't call the cancel function.
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
ctx, _ = context.WithTimeout(ctx, bus.timeout)
ctx, _ := context.WithTimeout(context.Background(), bus.timeout)
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
if err != nil {
// if no listeners for this event type are registered, that's okay, the event
Expand All @@ -155,6 +157,7 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica

// SendEvent sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
// The context passed in is currently ignored.
func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}
Expand Down
41 changes: 41 additions & 0 deletions vault/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,47 @@ func TestBusBasics(t *testing.T) {
}
}

// TestBusIgnoresSendContext tests that the context is ignored when sending to an event,
// so that we do not give up too quickly.
func TestBusIgnoresSendContext(t *testing.T) {
bus, err := NewEventBus(nil)
if err != nil {
t.Fatal(err)
}
eventType := logical.EventType("someType")

event, err := logical.NewEvent()
if err != nil {
t.Fatal(err)
}

bus.Start()

ch, subCancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType), "")
if err != nil {
t.Fatal(err)
}
defer subCancel()

ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately

err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Errorf("Expected no error sending: %v", err)
}

timeout := time.After(1 * time.Second)
select {
case message := <-ch:
if message.Payload.(*logical.EventReceived).Event.Id != event.Id {
t.Errorf("Got unexpected message: %+v", message)
}
case <-timeout:
t.Error("Timeout waiting for message")
}
}

// TestSubscribeNonRootNamespace verifies that events for non-root namespaces
// aren't filtered out by the bus.
func TestSubscribeNonRootNamespace(t *testing.T) {
Expand Down