-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable batch delivery over WebSockets #1447
Conversation
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
…ve readahead set Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Popped this back into draft, as I've found a pretty significant issue in how the batching assumptions work between
But the So in reality you get really bad delays. If you send a
What you should get is:
|
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
c9e9442
to
1c5f7e6
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #1447 +/- ##
=======================================
Coverage 99.99% 99.99%
=======================================
Files 321 321
Lines 23108 23175 +67
=======================================
+ Hits 23106 23173 +67
Misses 1 1
Partials 1 1 ☔ View full report in Codecov by Sentry. |
…as previously inert Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
@@ -224,7 +224,7 @@ nav_order: 2 | |||
|Key|Description|Type|Default Value| | |||
|---|-----------|----|-------------| | |||
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200` | |||
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` | |||
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually the behavior you got before, because the code that had (a very, very long time ago) been started to implement batch-timeout had been disabled. Making a separate comment to highlight where.
if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 { | ||
shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout) | ||
select { | ||
case <-shortTimeout.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See in this branch we just fall through to the longTimeoutDuration
wait, it basically means there was no use of the eventBatchTimeout
for its stated purpose.
An incomplete implementation had existed previously, and was partially removed leaving this code-cruft.
So in this PR I've come back round and implemented it with a short re-poll cycle (just once) after the batch timeout if the first time we trigger we get an incomplete batch.
It is disabled by default... but you could turn it on with readahead on subscriptions (and for optimization of the event loop), without needing to use batch-based delivery at all.
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
internal/events/event_dispatcher.go
Outdated
if err != nil { | ||
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a nit, but feels like this could be moved outside of the loop (ie down to line 431). Took me a few reads to convince myself that all errors were handled across all branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But will leave it with you on what you think is ultimately clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you actually point out a problem here, the reason it's in the loop is we still need to nack the other events. So I need to fix the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - would appreciate you taking a 2nd look if you don't mind
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New change looks good. Thanks for the comments on the flow.
New support of
batch
for WebSocketsIn #1359 @EnriqueL8 added support for Webhooks (outbound HTTPS requests) to configure
batch: true
on the subscription, such that events are delivered multiple at a time.This was deferred at the time for WebSockets (inbound HTTPS long-lived connections) because they already had read-ahead performance optimization, so it was lower priority.
However, the use of
readahead
in its current form in WebSockets does not allow the consumer of events to optimize. For example performing a single DB transaction in the application after processing a batch of events. You could implement a complex batch aggregation routing on the app-side, but that seems much more effort than FireFly just doing the batching for you.So this PR closes the gap and supports
batch: true
for WebSockets as well.You can test it simply with a dev env:
ws://127.0.0.1:5000/ws?ephemeral=true&namespace=default&autoack&batch=true&batchtimeout=1s&readahead=50
Fix to batch based delivery for WebHooks and WebSockets
In adding the WebSocket specific code, I found the original code in #1359 was not actually working as designed. It added a layer of extra batch collation logic on top of the event poller in
eventDelivery.deliverBatchedEvents
, but actually the way the dispatching code inbufferedDelivery
worked the behavior was broken.If you had 1 event arrive, then a very short delay and 2 more events arrive (simple to recreate with a
/broadcast
in a dev env), you ended up getting a batch of one, then the whole batch delay waiting, then a batch of 2 after a second wait. Instead of getting a single batch of 3 after the batch delay.... so the batch delay was doubled to deliver two incomplete batches 😬
This was quite a big problem, and after some wrestling I ended up:
eventPoller
, which had ended up disabled a very long time agoeventPoller
to off (batchTimeout=0
by default in config) as this is actually the behavior that you get todayeventDelivery.deliverBatchedEvents
function added in feat: batching events and webhook plugin support #1359eventDispatcher
to configureeventPoller
batching settingseventDelivery.bufferedDelivery
andeventDelivery.deliverEvents
to handle batching