From 757c3b715a3f9a3945e1df02bb5302a2ebb8667b Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Wed, 4 Oct 2023 09:14:16 -0700 Subject: [PATCH 1/3] events: Ignore send context When sending an event asynchornously, the original context used for whatever generated the event (probably a synchronous, quick HTTP context) is probably not what is wanted for sending the event, which could face delays if a consumer is backed up. I will admit myself to sometimes having "context blindness", where I just take whatever context is incoming in a function and thread it out to all calls. Normally this is the right thing to do when, say, tying downstream API calls to an upstream HTTP timeout. When making KV events, for example, we used the HTTP context for `SendEvent()`, and this can cause the events to be dropped if they aren't taken from the channel before the HTTP request finishes. In retrospect, it was probably unnecessary to include a context in the `SendEvent` interface. We keep the context in place for backwards compability, but also in case we want to use it for purposes other than timeouts and cancellations in the future. --- changelog/xyz.txt | 3 +++ vault/eventbus/bus.go | 6 ++++-- vault/eventbus/bus_test.go | 41 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 changelog/xyz.txt diff --git a/changelog/xyz.txt b/changelog/xyz.txt new file mode 100644 index 000000000000..52f95c9c4c47 --- /dev/null +++ b/changelog/xyz.txt @@ -0,0 +1,3 @@ +```release-note:bug +events: Ignore sending context to give more time for events to send +``` diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index f5a03e570dbb..5fbc7c9d071e 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -114,7 +114,8 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo // This function is meant to be used by trusted internal code, so it can specify details like the namespace // and plugin info. Events from plugins should be routed through WithPlugin(), which will populate // the namespace and plugin info automatically. -func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { +// The context passed in is currently ignored. +func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { if ns == nil { return namespace.ErrNoNamespace } @@ -130,7 +131,7 @@ func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namesp // We can't easily know when the SendEvent is complete, so we can't call the cancel function. // But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long. - ctx, _ = context.WithTimeout(ctx, bus.timeout) + ctx, _ := context.WithTimeout(context.Background(), bus.timeout) _, err := bus.broker.Send(ctx, eventTypeAll, eventReceived) if err != nil { // if no listeners for this event type are registered, that's okay, the event @@ -155,6 +156,7 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica // SendEvent sends an event to the event bus and routes it to all relevant subscribers. // This function does *not* wait for all subscribers to acknowledge before returning. +// The context passed in is currently ignored. func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.EventType, data *logical.EventData) error { return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data) } diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index 1dbe9170e560..d6e5c9c9cbaf 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -75,6 +75,47 @@ func TestBusBasics(t *testing.T) { } } +// TestBusIgnoresSendContext tests that the context is ignored when sending to an event, +// so that we do not give up too quickly. +func TestBusIgnoresSendContext(t *testing.T) { + bus, err := NewEventBus(nil) + if err != nil { + t.Fatal(err) + } + eventType := logical.EventType("someType") + + event, err := logical.NewEvent() + if err != nil { + t.Fatal(err) + } + + bus.Start() + + ch, subCancel, err := bus.Subscribe(context.Background(), namespace.RootNamespace, string(eventType), "") + if err != nil { + t.Fatal(err) + } + defer subCancel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) + if err != nil { + t.Errorf("Expected no error sending: %v", err) + } + + timeout := time.After(1 * time.Second) + select { + case message := <-ch: + if message.Payload.(*logical.EventReceived).Event.Id != event.Id { + t.Errorf("Got unexpected message: %+v", message) + } + case <-timeout: + t.Error("Timeout waiting for message") + } +} + // TestSubscribeNonRootNamespace verifies that events for non-root namespaces // aren't filtered out by the bus. func TestSubscribeNonRootNamespace(t *testing.T) { From 93528e8128204b889cdca1435524e07f1e634a3c Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Wed, 4 Oct 2023 09:23:43 -0700 Subject: [PATCH 2/3] Move changelog --- changelog/{xyz.txt => 23500.txt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/{xyz.txt => 23500.txt} (100%) diff --git a/changelog/xyz.txt b/changelog/23500.txt similarity index 100% rename from changelog/xyz.txt rename to changelog/23500.txt From 8ef67a1bf6b4d9c4309a8405341bff5ccb1c1f16 Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Thu, 5 Oct 2023 13:21:15 -0700 Subject: [PATCH 3/3] Add additional comment about event context --- vault/eventbus/bus.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 5fbc7c9d071e..0185e9fbadf4 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -114,7 +114,8 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo // This function is meant to be used by trusted internal code, so it can specify details like the namespace // and plugin info. Events from plugins should be routed through WithPlugin(), which will populate // the namespace and plugin info automatically. -// The context passed in is currently ignored. +// The context passed in is currently ignored to ensure that the event is sent if the context is short-lived, +// such as with an HTTP request context. func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { if ns == nil { return namespace.ErrNoNamespace