Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batch delivery over WebSockets #1447

Merged
merged 9 commits into from
Jan 19, 2024
7 changes: 4 additions & 3 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the behavior you got before, because the code that had (a very, very long time ago) been started to implement batch-timeout had been disabled. Making a separate comment to highlight where.

|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
Expand All @@ -249,7 +249,7 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

Expand Down Expand Up @@ -1387,7 +1387,8 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
|batchTimeout|Default batch timeout|`int`|`50ms`

## subscription.retry

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wsack.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ nav_order: 24

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
| `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/types/wserror.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ nav_order: 25

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSAck.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `error` | WSAck.error | `string` |

2 changes: 1 addition & 1 deletion docs/reference/types/wsstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ nav_order: 23

| Field Name | Description | Type |
|------------|-------------|------|
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"` |
| `type` | WSActionBase.type | `FFEnum`:<br/>`"start"`<br/>`"ack"`<br/>`"protocol_error"`<br/>`"event_batch"` |
| `autoack` | WSStart.autoack | `bool` |
| `namespace` | WSStart.namespace | `string` |
| `name` | WSStart.name | `string` |
Expand Down
15 changes: 9 additions & 6 deletions internal/coreconfig/coreconfig.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -327,8 +327,10 @@ var (
OrgDescription = ffc("org.description")
// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
// SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
// SubscriptionDefaultsBatchTimeout default batch timeout
SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
SubscriptionMax = ffc("subscription.max")
// SubscriptionsRetryInitialDelay is the initial retry delay
Expand Down Expand Up @@ -404,7 +406,7 @@ func setDefaults() {
viper.SetDefault(string(DownloadRetryFactor), 2.0)
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
viper.SetDefault(string(EventAggregatorBatchSize), 200)
viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
Expand All @@ -414,7 +416,7 @@ func setDefaults() {
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
viper.SetDefault(string(EventDBEventsBufferSize), 100)
viper.SetDefault(string(EventDispatcherBufferLength), 5)
viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
Expand Down Expand Up @@ -451,7 +453,8 @@ func setDefaults() {
viper.SetDefault(string(PrivateMessagingBatchSize), 200)
viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
viper.SetDefault(string(SubscriptionMax), 500)
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
Expand Down
7 changes: 4 additions & 3 deletions internal/coremsgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -383,8 +383,9 @@ var (
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)

ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)

ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
Expand Down
142 changes: 58 additions & 84 deletions internal/events/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -65,25 +65,21 @@ type eventDispatcher struct {
elected bool
eventPoller *eventPoller
inflight map[fftypes.UUID]*core.Event
eventDelivery chan *core.EventDelivery
eventDelivery chan []*core.EventDelivery
mux sync.Mutex
namespace string
readAhead int
batch bool
batchTimeout time.Duration
subscription *subscription
txHelper txcommon.Helper
}

func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
ctx, cancelCtx := context.WithCancel(ctx)
readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
readAhead := uint(0)
if sub.definition.Options.ReadAhead != nil {
readAhead = uint(*sub.definition.Options.ReadAhead)
}
if readAhead > maxReadAhead {
readAhead = maxReadAhead
}

batchTimeout := defaultBatchTimeout
if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
Expand All @@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
subscription: sub,
namespace: sub.definition.Namespace,
inflight: make(map[fftypes.UUID]*core.Event),
eventDelivery: make(chan *core.EventDelivery, readAhead+1),
eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
readAhead: int(readAhead),
acksNacks: make(chan ackNack),
closed: make(chan struct{}),
txHelper: txHelper,
batch: batch,
batchTimeout: batchTimeout,
}

pollerConf := &eventPollerConf{
Expand All @@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
firstEvent: sub.definition.Options.FirstEvent,
}

// Users can tune the batch related settings.
// This is always true in batch:true cases, and optionally you can use the batchTimeout setting
// to tweak how we optimize ourselves for readahead / latency detection without batching
// (most likely users with this requirement would be best to just move to batch:true).
if batchTimeout > 0 {
pollerConf.eventBatchTimeout = batchTimeout
if batchTimeout > pollerConf.eventPollTimeout {
pollerConf.eventPollTimeout = batchTimeout
}
}
if batch || pollerConf.eventBatchSize < int(readAhead) {
pollerConf.eventBatchSize = ed.readAhead
}

ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
return ed
}
Expand Down Expand Up @@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
ed.elected = true
ed.eventPoller.start()

if ed.batch {
go ed.deliverBatchedEvents()
} else {
go ed.deliverEvents()
}
go ed.deliverEvents()

// Wait until the event poller closes
<-ed.eventPoller.closed
}
Expand Down Expand Up @@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
ed.mux.Unlock()

dispatched++
ed.eventDelivery <- event
if !ed.batch {
// dispatch individually
ed.eventDelivery <- []*core.EventDelivery{event}
}
}

if ed.batch && len(dispatchable) > 0 {
// Dispatch the whole batch now marked in-flight
ed.eventDelivery <- dispatchable
}

if inflightCount == 0 {
Expand Down Expand Up @@ -384,88 +398,48 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}

func (ed *eventDispatcher) deliverBatchedEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData

var events []*core.CombinedEventDataDelivery
var batchTimeoutContext context.Context
var batchTimeoutCancel func()
for {
var timeoutContext context.Context
var timedOut bool
if batchTimeoutContext != nil {
timeoutContext = batchTimeoutContext
} else {
timeoutContext = ed.ctx
}
select {
case event, ok := <-ed.eventDelivery:
if !ok {
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if events == nil {
events = []*core.CombinedEventDataDelivery{}
batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
}

log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)

var data []*core.Data
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
}

events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})

if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
}

case <-timeoutContext.Done():
timedOut = true
case <-ed.ctx.Done():
if batchTimeoutCancel != nil {
batchTimeoutCancel()
}
return
}

if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
_ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
// If err handle all the delivery responses for all the events??
events = nil
}
}
}

// TODO issue here, we can't just call DeliveryRequest with one thing.
func (ed *eventDispatcher) deliverEvents() {
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
for {
select {
case event, ok := <-ed.eventDelivery:
case events, ok := <-ed.eventDelivery:
if !ok {
return
}

log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
var data []*core.Data
// As soon as we hit an error, we need to trigger into nack mode
var err error
if withData && event.Message != nil {
data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
for i := 0; i < len(events) && err == nil; i++ {
e := &core.CombinedEventDataDelivery{
Event: events[i],
}
eventsWithData[i] = e
log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
if withData && e.Event.Message != nil {
e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
}
// Individual events (in reality there is only ever i==0 for this case)
if err == nil && !ed.batch {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a nit, but feels like this could be moved outside of the loop (ie down to line 431). Took me a few reads to convince myself that all errors were handled across all branches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But will leave it with you on what you think is ultimately clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you actually point out a problem here, the reason it's in the loop is we still need to nack the other events. So I need to fix the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - would appreciate you taking a 2nd look if you don't mind


if err == nil {
err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
}
if err != nil {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
// In batch mode we do one dispatch of the whole set as one
if err == nil && ed.batch {
err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
if err != nil {
// nack everything on behalf of the failed delivery
for _, e := range events {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
}
}
}

case <-ed.ctx.Done():
return
}
Expand Down
Loading
Loading