diff --git a/docs/reference/config.md b/docs/reference/config.md
index fbb937f57..45f63e89b 100644
--- a/docs/reference/config.md
+++ b/docs/reference/config.md
@@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
-|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
+|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
@@ -249,7 +249,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
-|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
+|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
@@ -1387,7 +1387,8 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
-|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0`
+|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50`
+|batchTimeout|Default batch timeout|`int`|`50ms`
## subscription.retry
diff --git a/docs/reference/types/wsack.md b/docs/reference/types/wsack.md
index 5b2385331..fc44ff2f0 100644
--- a/docs/reference/types/wsack.md
+++ b/docs/reference/types/wsack.md
@@ -37,7 +37,7 @@ nav_order: 24
| Field Name | Description | Type |
|------------|-------------|------|
-| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` |
+| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` |
| `id` | WSAck.id | [`UUID`](simpletypes#uuid) |
| `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) |
diff --git a/docs/reference/types/wserror.md b/docs/reference/types/wserror.md
index e7052b0a1..ed2cb06e1 100644
--- a/docs/reference/types/wserror.md
+++ b/docs/reference/types/wserror.md
@@ -33,6 +33,6 @@ nav_order: 25
| Field Name | Description | Type |
|------------|-------------|------|
-| `type` | WSAck.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` |
+| `type` | WSAck.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` |
| `error` | WSAck.error | `string` |
diff --git a/docs/reference/types/wsstart.md b/docs/reference/types/wsstart.md
index 3c606a79b..f8f3798f7 100644
--- a/docs/reference/types/wsstart.md
+++ b/docs/reference/types/wsstart.md
@@ -42,7 +42,7 @@ nav_order: 23
| Field Name | Description | Type |
|------------|-------------|------|
-| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` |
+| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` |
| `autoack` | WSStart.autoack | `bool` |
| `namespace` | WSStart.namespace | `string` |
| `name` | WSStart.name | `string` |
diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go
index 1a8e904ec..6ae2b8a78 100644
--- a/internal/coreconfig/coreconfig.go
+++ b/internal/coreconfig/coreconfig.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -327,8 +327,10 @@ var (
OrgDescription = ffc("org.description")
// OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup
OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts")
- // SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead
- SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize")
+ // SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead
+ SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize")
+ // SubscriptionDefaultsBatchTimeout default batch timeout
+ SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout")
// SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)
SubscriptionMax = ffc("subscription.max")
// SubscriptionsRetryInitialDelay is the initial retry delay
@@ -404,7 +406,7 @@ func setDefaults() {
viper.SetDefault(string(DownloadRetryFactor), 2.0)
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
viper.SetDefault(string(EventAggregatorBatchSize), 200)
- viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
+ viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
@@ -414,7 +416,7 @@ func setDefaults() {
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
viper.SetDefault(string(EventDBEventsBufferSize), 100)
viper.SetDefault(string(EventDispatcherBufferLength), 5)
- viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
+ viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
viper.SetDefault(string(EventTransportsDefault), "websockets")
@@ -451,7 +453,8 @@ func setDefaults() {
viper.SetDefault(string(PrivateMessagingBatchSize), 200)
viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s")
viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb")
- viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0)
+ viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50)
+ viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms")
viper.SetDefault(string(SubscriptionMax), 500)
viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms")
viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s")
diff --git a/internal/coremsgs/en_config_descriptions.go b/internal/coremsgs/en_config_descriptions.go
index a148b5b2b..b766b72b6 100644
--- a/internal/coremsgs/en_config_descriptions.go
+++ b/internal/coremsgs/en_config_descriptions.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -383,8 +383,9 @@ var (
ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType)
ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType)
- ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
- ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
+ ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType)
+ ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType)
+ ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType)
ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType)
ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType)
diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go
index a862d2dd5..8afd99e39 100644
--- a/internal/events/event_dispatcher.go
+++ b/internal/events/event_dispatcher.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -65,25 +65,21 @@ type eventDispatcher struct {
elected bool
eventPoller *eventPoller
inflight map[fftypes.UUID]*core.Event
- eventDelivery chan *core.EventDelivery
+ eventDelivery chan []*core.EventDelivery
mux sync.Mutex
namespace string
readAhead int
batch bool
- batchTimeout time.Duration
subscription *subscription
txHelper txcommon.Helper
}
func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher {
ctx, cancelCtx := context.WithCancel(ctx)
- readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead)
+ readAhead := uint(0)
if sub.definition.Options.ReadAhead != nil {
readAhead = uint(*sub.definition.Options.ReadAhead)
}
- if readAhead > maxReadAhead {
- readAhead = maxReadAhead
- }
batchTimeout := defaultBatchTimeout
if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" {
@@ -108,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
subscription: sub,
namespace: sub.definition.Namespace,
inflight: make(map[fftypes.UUID]*core.Event),
- eventDelivery: make(chan *core.EventDelivery, readAhead+1),
+ eventDelivery: make(chan []*core.EventDelivery, readAhead+1),
readAhead: int(readAhead),
acksNacks: make(chan ackNack),
closed: make(chan struct{}),
txHelper: txHelper,
batch: batch,
- batchTimeout: batchTimeout,
}
pollerConf := &eventPollerConf{
@@ -138,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.
firstEvent: sub.definition.Options.FirstEvent,
}
+ // Users can tune the batch related settings.
+ // This is always true in batch:true cases, and optionally you can use the batchTimeout setting
+ // to tweak how we optimize ourselves for readahead / latency detection without batching
+ // (most likely users with this requirement would be best to just move to batch:true).
+ if batchTimeout > 0 {
+ pollerConf.eventBatchTimeout = batchTimeout
+ if batchTimeout > pollerConf.eventPollTimeout {
+ pollerConf.eventPollTimeout = batchTimeout
+ }
+ }
+ if batch || pollerConf.eventBatchSize < int(readAhead) {
+ pollerConf.eventBatchSize = ed.readAhead
+ }
+
ed.eventPoller = newEventPoller(ctx, di, en, pollerConf)
return ed
}
@@ -165,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() {
ed.elected = true
ed.eventPoller.start()
- if ed.batch {
- go ed.deliverBatchedEvents()
- } else {
- go ed.deliverEvents()
- }
+ go ed.deliverEvents()
+
// Wait until the event poller closes
<-ed.eventPoller.closed
}
@@ -326,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo
ed.mux.Unlock()
dispatched++
- ed.eventDelivery <- event
+ if !ed.batch {
+ // dispatch individually
+ ed.eventDelivery <- []*core.EventDelivery{event}
+ }
+ }
+
+ if ed.batch && len(dispatchable) > 0 {
+ // Dispatch the whole batch now marked in-flight
+ ed.eventDelivery <- dispatchable
}
if inflightCount == 0 {
@@ -384,88 +398,59 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) {
}
}
-func (ed *eventDispatcher) deliverBatchedEvents() {
+func (ed *eventDispatcher) deliverEvents() {
withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
-
- var events []*core.CombinedEventDataDelivery
- var batchTimeoutContext context.Context
- var batchTimeoutCancel func()
for {
- var timeoutContext context.Context
- var timedOut bool
- if batchTimeoutContext != nil {
- timeoutContext = batchTimeoutContext
- } else {
- timeoutContext = ed.ctx
- }
select {
- case event, ok := <-ed.eventDelivery:
+ case events, ok := <-ed.eventDelivery:
if !ok {
- if batchTimeoutCancel != nil {
- batchTimeoutCancel()
- }
return
}
- if events == nil {
- events = []*core.CombinedEventDataDelivery{}
- batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout)
- }
-
- log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
-
- var data []*core.Data
+ // As soon as we hit an error, we need to trigger into nack mode
var err error
- if withData && event.Message != nil {
- data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
- }
-
- events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data})
-
- if err != nil {
- ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
- }
-
- case <-timeoutContext.Done():
- timedOut = true
- case <-ed.ctx.Done():
- if batchTimeoutCancel != nil {
- batchTimeoutCancel()
- }
- return
- }
- if len(events) == ed.readAhead || (timedOut && len(events) > 0) {
- _ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events)
- // If err handle all the delivery responses for all the events??
- events = nil
- }
- }
-}
-
-// TODO issue here, we can't just call DeliveryRequest with one thing.
-func (ed *eventDispatcher) deliverEvents() {
- withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData
- for {
- select {
- case event, ok := <-ed.eventDelivery:
- if !ok {
- return
+ // Loop through the events enriching them, and dispatching individually in non-batch mode
+ eventsWithData := make([]*core.CombinedEventDataDelivery, len(events))
+ for i := 0; i < len(events); i++ {
+ e := &core.CombinedEventDataDelivery{
+ Event: events[i],
+ }
+ eventsWithData[i] = e
+ // The first error we encounter stops us attempting to enrich or dispatch any more events
+ if err == nil {
+ log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference)
+ if withData && e.Event.Message != nil {
+ e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message)
+ }
+ }
+ // If we are non-batched, we have to deliver each event individually...
+ if !ed.batch {
+ // .. only attempt to deliver if we've not triggered into an error scenario for one of the events already
+ if err == nil {
+ err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data)
+ }
+ // ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events
+ if err != nil {
+ ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
+ }
+ }
}
- log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference)
- var data []*core.Data
- var err error
- if withData && event.Message != nil {
- data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message)
+ // In batch mode we do one dispatch of the whole set as one
+ if ed.batch {
+ // Only attempt to deliver if we're in a non error case (enrich might have failed above)
+ if err == nil {
+ err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData)
+ }
+ // If we're in an error case we have to nack everything immediately
+ if err != nil {
+ for _, e := range events {
+ ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true})
+ }
+ }
}
- if err == nil {
- err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data)
- }
- if err != nil {
- ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true})
- }
case <-ed.ctx.Done():
return
}
diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go
index 2f592c92d..a056c3251 100644
--- a/internal/events/event_dispatcher_test.go
+++ b/internal/events/event_dispatcher_test.go
@@ -23,7 +23,6 @@ import (
"testing"
"time"
- "github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/cache"
@@ -131,20 +130,6 @@ func TestEventDispatcherStartStopBatched(t *testing.T) {
ed.close()
}
-func TestMaxReadAhead(t *testing.T) {
- config.Set(coreconfig.SubscriptionDefaultsReadAhead, 65537)
- ed, cancel := newTestEventDispatcher(&subscription{
- dispatcherElection: make(chan bool, 1),
- definition: &core.Subscription{
- SubscriptionRef: core.SubscriptionRef{Namespace: "ns1", Name: "sub1"},
- Ephemeral: true,
- Options: core.SubscriptionOptions{},
- },
- })
- defer cancel()
- assert.Equal(t, int(65536), ed.readAhead)
-}
-
func TestEventDispatcherLeaderElection(t *testing.T) {
log.SetLevel("debug")
@@ -377,6 +362,171 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) {
mdm.AssertExpectations(t)
}
+func TestEventDispatcherBatchBased(t *testing.T) {
+ log.SetLevel("debug")
+ three := uint16(3)
+ longTime := "1m"
+ subID := fftypes.NewUUID()
+ truthy := true
+ sub := &subscription{
+ dispatcherElection: make(chan bool, 1),
+ definition: &core.Subscription{
+ SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
+ Options: core.SubscriptionOptions{
+ SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+ Batch: &truthy,
+ ReadAhead: &three,
+ BatchTimeout: &longTime, // because the batch should fill
+ },
+ },
+ },
+ eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
+ }
+
+ ed, cancel := newTestEventDispatcher(sub)
+ defer cancel()
+ go ed.deliverEvents()
+ ed.eventPoller.offsetCommitted = make(chan int64, 3)
+ mdi := ed.database.(*databasemocks.Plugin)
+ mei := ed.transport.(*eventsmocks.Plugin)
+ mdm := ed.data.(*datamocks.Manager)
+
+ eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
+ deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
+ deliveryRequestMock.RunFn = func(a mock.Arguments) {
+ eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
+ }
+
+ // Setup the IDs
+ ref1 := fftypes.NewUUID()
+ ev1 := fftypes.NewUUID()
+ ref2 := fftypes.NewUUID()
+ ev2 := fftypes.NewUUID()
+ ref3 := fftypes.NewUUID()
+ ev3 := fftypes.NewUUID()
+ ref4 := fftypes.NewUUID()
+ ev4 := fftypes.NewUUID()
+
+ // Setup enrichment
+ mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{
+ Header: core.MessageHeader{ID: ref1},
+ }, nil, true, nil)
+ mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{
+ Header: core.MessageHeader{ID: ref2},
+ }, nil, true, nil)
+ mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{
+ Header: core.MessageHeader{ID: ref3},
+ }, nil, true, nil)
+ mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{
+ Header: core.MessageHeader{ID: ref4},
+ }, nil, true, nil)
+
+ // Deliver a batch of messages
+ batch1Done := make(chan struct{})
+ go func() {
+ repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
+ &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match
+ &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected},
+ &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match
+ &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match
+ })
+ assert.NoError(t, err)
+ assert.True(t, repoll)
+ close(batch1Done)
+ }()
+
+ // Expect to get the batch dispatched - with the three matching events
+ events := <-eventDeliveries
+ assert.Len(t, events, 3)
+ assert.Equal(t, *ev1, *events[0].Event.ID)
+ assert.Equal(t, *ref1, *events[0].Event.Message.Header.ID)
+ assert.Equal(t, *ev3, *events[1].Event.ID)
+ assert.Equal(t, *ref3, *events[1].Event.Message.Header.ID)
+ assert.Equal(t, *ev4, *events[2].Event.ID)
+ assert.Equal(t, *ref4, *events[2].Event.Message.Header.ID)
+
+ // Ack the batch
+ go func() {
+ ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[0].Event.ID})
+ ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[1].Event.ID})
+ ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[2].Event.ID})
+ }()
+
+ assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted)
+ assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted)
+ assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted)
+
+ // This should complete the batch
+ <-batch1Done
+
+ mdi.AssertExpectations(t)
+ mei.AssertExpectations(t)
+ mdm.AssertExpectations(t)
+}
+
+func TestEventDispatcherBatchDispatchFail(t *testing.T) {
+ log.SetLevel("debug")
+ two := uint16(2)
+ longTime := "1m"
+ subID := fftypes.NewUUID()
+ truthy := true
+ sub := &subscription{
+ dispatcherElection: make(chan bool, 1),
+ definition: &core.Subscription{
+ SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
+ Options: core.SubscriptionOptions{
+ SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+ Batch: &truthy,
+ ReadAhead: &two,
+ BatchTimeout: &longTime, // because the batch should fill
+ },
+ },
+ },
+ eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
+ }
+
+ ed, cancel := newTestEventDispatcher(sub)
+ defer cancel()
+ go ed.deliverEvents()
+ ed.eventPoller.offsetCommitted = make(chan int64, 3)
+ mdi := ed.database.(*databasemocks.Plugin)
+ mei := ed.transport.(*eventsmocks.Plugin)
+ mdm := ed.data.(*datamocks.Manager)
+
+ eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
+ deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
+ deliveryRequestMock.RunFn = func(a mock.Arguments) {
+ eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
+ }
+
+ // Deliver a batch of messages
+ batch1Done := make(chan struct{})
+ go func() {
+ repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
+ &core.Event{ID: fftypes.NewUUID(), Sequence: 10000001, Type: core.EventTypeMessageConfirmed},
+ &core.Event{ID: fftypes.NewUUID(), Sequence: 10000002, Type: core.EventTypeMessageConfirmed},
+ })
+ assert.NoError(t, err)
+ assert.True(t, repoll)
+ close(batch1Done)
+ }()
+
+ mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(&core.Message{
+ Header: core.MessageHeader{ID: fftypes.NewUUID()},
+ }, nil, true, nil)
+
+ // Expect to get the batch dispatched - with the three matching events
+ events := <-eventDeliveries
+ assert.Len(t, events, 2)
+
+ // This should complete the batch
+ <-batch1Done
+
+ mdi.AssertExpectations(t)
+ mei.AssertExpectations(t)
+ mdm.AssertExpectations(t)
+}
+
func TestEnrichEventsFailGetMessages(t *testing.T) {
sub := &subscription{
@@ -966,21 +1116,6 @@ func TestEventDeliveryClosed(t *testing.T) {
cancel()
}
-func TestBatchEventDeliveryClosed(t *testing.T) {
-
- sub := &subscription{
- definition: &core.Subscription{},
- }
- ed, cancel := newTestEventDispatcher(sub)
- defer cancel()
-
- ed.batchTimeout = 1 * time.Minute
- ed.eventDelivery <- &core.EventDelivery{}
- close(ed.eventDelivery)
-
- ed.deliverBatchedEvents()
-}
-
func TestAckClosed(t *testing.T) {
sub := &subscription{
@@ -1039,17 +1174,19 @@ func TestDeliverEventsWithDataFail(t *testing.T) {
mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
id1 := fftypes.NewUUID()
- ed.eventDelivery <- &core.EventDelivery{
- EnrichedEvent: core.EnrichedEvent{
- Event: core.Event{
- ID: id1,
- },
- Message: &core.Message{
- Header: core.MessageHeader{
- ID: fftypes.NewUUID(),
+ ed.eventDelivery <- []*core.EventDelivery{
+ {
+ EnrichedEvent: core.EnrichedEvent{
+ Event: core.Event{
+ ID: id1,
},
- Data: core.DataRefs{
- {ID: fftypes.NewUUID()},
+ Message: &core.Message{
+ Header: core.MessageHeader{
+ ID: fftypes.NewUUID(),
+ },
+ Data: core.DataRefs{
+ {ID: fftypes.NewUUID()},
+ },
},
},
},
@@ -1166,154 +1303,3 @@ func TestEventDeliveryBatch(t *testing.T) {
mbm.AssertExpectations(t)
mms.AssertExpectations(t)
}
-
-func TestEventDispatcherBatchReadAhead(t *testing.T) {
- log.SetLevel("debug")
- var five = uint16(5)
- subID := fftypes.NewUUID()
- truthy := true
- oneSec := "1s"
- sub := &subscription{
- dispatcherElection: make(chan bool, 1),
- definition: &core.Subscription{
- SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
- Options: core.SubscriptionOptions{
- SubscriptionCoreOptions: core.SubscriptionCoreOptions{
- ReadAhead: &five,
- Batch: &truthy,
- BatchTimeout: &oneSec,
- },
- },
- },
- eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
- }
-
- ed, cancel := newTestEventDispatcher(sub)
- defer cancel()
- go ed.deliverBatchedEvents()
- ed.eventPoller.offsetCommitted = make(chan int64, 3)
- mdi := ed.database.(*databasemocks.Plugin)
- mei := ed.transport.(*eventsmocks.Plugin)
- mdm := ed.data.(*datamocks.Manager)
-
- eventDeliveries := make(chan *core.EventDelivery)
- deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil)
- deliveryRequestMock.RunFn = func(a mock.Arguments) {
- batchEvents := a.Get(3).([]*core.CombinedEventDataDelivery)
- for _, event := range batchEvents {
- eventDeliveries <- event.Event
- }
- }
-
- // Setup the IDs
- ref1 := fftypes.NewUUID()
- ev1 := fftypes.NewUUID()
- ref2 := fftypes.NewUUID()
- ev2 := fftypes.NewUUID()
- ref3 := fftypes.NewUUID()
- ev3 := fftypes.NewUUID()
- ref4 := fftypes.NewUUID()
- ev4 := fftypes.NewUUID()
-
- // Setup enrichment
- mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{
- Header: core.MessageHeader{ID: ref1},
- }, nil, true, nil)
- mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{
- Header: core.MessageHeader{ID: ref2},
- }, nil, true, nil)
- mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{
- Header: core.MessageHeader{ID: ref3},
- }, nil, true, nil)
- mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{
- Header: core.MessageHeader{ID: ref4},
- }, nil, true, nil)
-
- // Deliver a batch of messages
- batch1Done := make(chan struct{})
- go func() {
- repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
- &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match
- &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected},
- &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match
- &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match
- })
- assert.NoError(t, err)
- assert.True(t, repoll)
- close(batch1Done)
- }()
-
- // Wait for the two calls to deliver the matching messages to the client (read ahead allows this)
- event1 := <-eventDeliveries
- assert.Equal(t, *ev1, *event1.ID)
- assert.Equal(t, *ref1, *event1.Message.Header.ID)
- event3 := <-eventDeliveries
- assert.Equal(t, *ev3, *event3.ID)
- assert.Equal(t, *ref3, *event3.Message.Header.ID)
- event4 := <-eventDeliveries
- assert.Equal(t, *ev4, *event4.ID)
- assert.Equal(t, *ref4, *event4.Message.Header.ID)
-
- // Send back the two acks - out of order to validate the read-ahead logic
- go func() {
- ed.deliveryResponse(&core.EventDeliveryResponse{ID: event4.ID})
- ed.deliveryResponse(&core.EventDeliveryResponse{ID: event1.ID})
- ed.deliveryResponse(&core.EventDeliveryResponse{ID: event3.ID})
- }()
-
- // Confirm we get the offset updates in the correct order, even though the confirmations
- // came in a different order from the app.
- assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted)
- assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted)
- assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted)
-
- // This should complete the batch
- <-batch1Done
-
- mdi.AssertExpectations(t)
- mei.AssertExpectations(t)
- mdm.AssertExpectations(t)
-}
-
-func TestBatchDeliverEventsWithDataFail(t *testing.T) {
- yes := true
- sub := &subscription{
- definition: &core.Subscription{
- Options: core.SubscriptionOptions{
- SubscriptionCoreOptions: core.SubscriptionCoreOptions{
- WithData: &yes,
- },
- },
- },
- }
-
- ed, cancel := newTestEventDispatcher(sub)
- defer cancel()
-
- mdm := ed.data.(*datamocks.Manager)
- mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop"))
-
- id1 := fftypes.NewUUID()
- ed.eventDelivery <- &core.EventDelivery{
- EnrichedEvent: core.EnrichedEvent{
- Event: core.Event{
- ID: id1,
- },
- Message: &core.Message{
- Header: core.MessageHeader{
- ID: fftypes.NewUUID(),
- },
- Data: core.DataRefs{
- {ID: fftypes.NewUUID()},
- },
- },
- },
- }
-
- ed.inflight[*id1] = &core.Event{ID: id1}
- go ed.deliverBatchedEvents()
-
- an := <-ed.acksNacks
- assert.True(t, an.isNack)
-
-}
diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go
index 9d876b6c8..cd509cc20 100644
--- a/internal/events/event_poller.go
+++ b/internal/events/event_poller.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() {
close(ep.offsetCommitted)
}()
+ doBatchDelay := false
for {
+ if doBatchDelay {
+ ep.waitForBatchTimeout()
+ }
+
// Read messages from the DB - in an error condition we retry until success, or a closed context
events, err := ep.readPage()
if err != nil {
@@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() {
}
eventCount := len(events)
+
+ // We might want to wait for the batch to fill - so we delay and re-poll
+ if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize {
+ doBatchDelay = true
+ l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout)
+ continue
+ }
+ doBatchDelay = false // clear any batch delay for next iteration
+
repoll := false
if eventCount > 0 {
// We process all the events in the page in a single database run group, and
@@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() {
}
}
+func (ep *eventPoller) waitForBatchTimeout() {
+ // For throughput optimized environments, we can set an eventBatchingTimeout to allow
+ // dispatching of incomplete batches at a shorter timeout than the
+ // long timeout between polling cycles (at the cost of some dispatch latency)
+ select {
+ case <-time.After(ep.conf.eventBatchTimeout):
+ case <-ep.ctx.Done():
+ }
+}
+
func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool {
l := log.L(ep.ctx)
longTimeoutDuration := ep.conf.eventPollTimeout
@@ -289,20 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool
return true
}
- // For throughput optimized environments, we can set an eventBatchingTimeout to allow messages to arrive
- // between polling cycles (at the cost of some dispatch latency)
- if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 {
- shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout)
- select {
- case <-shortTimeout.C:
- l.Tracef("Woken after batch timeout")
- case <-ep.ctx.Done():
- l.Debugf("Exiting due to cancelled context")
- return false
- }
- longTimeoutDuration -= ep.conf.eventBatchTimeout
- }
-
longTimeout := time.NewTimer(longTimeoutDuration)
select {
case <-longTimeout.C:
diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go
index 8215d2c8a..de080a5a2 100644
--- a/internal/events/event_poller_test.go
+++ b/internal/events/event_poller_test.go
@@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa
ctx, cancel := context.WithCancel(context.Background())
ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{
eventBatchSize: 10,
- eventBatchTimeout: 1 * time.Millisecond,
+ eventBatchTimeout: 0, // customized for individual tests that enable this
eventPollTimeout: 10 * time.Second,
startupOffsetRetryAttempts: 1,
retry: retry.Retry{
@@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) {
mdi.AssertExpectations(t)
}
+func TestReadPageBatchTimeoutNotFull(t *testing.T) {
+ mdi := &databasemocks.Plugin{}
+ processEventCalled := make(chan []core.LocallySequenced, 1)
+ ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
+ processEventCalled <- events
+ return false, nil
+ }, nil)
+ ep.conf.eventBatchTimeout = 1 * time.Microsecond
+ ep.conf.eventBatchSize = 3
+ ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+ ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+ mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch
+ mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
+ ep.shoulderTap()
+ }).Once()
+ mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
+ cancel()
+ })
+ ep.eventLoop()
+
+ events := <-processEventCalled
+ assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
+ assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
+ mdi.AssertExpectations(t)
+}
+
+func TestReadPageBatchFull(t *testing.T) {
+ mdi := &databasemocks.Plugin{}
+ processEventCalled := make(chan []core.LocallySequenced, 1)
+ ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
+ processEventCalled <- events
+ return false, nil
+ }, nil)
+ ep.conf.eventBatchTimeout = 1 * time.Microsecond
+ ep.conf.eventBatchSize = 2
+ ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+ ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
+ mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
+ ep.shoulderTap()
+ }).Once()
+ mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
+ cancel()
+ })
+ ep.eventLoop()
+
+ events := <-processEventCalled
+ assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
+ assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
+ mdi.AssertExpectations(t)
+}
+
func TestReadPageRewind(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan core.LocallySequenced, 1)
@@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) {
ep.shoulderTap() // this should not block
}
+func TestWaitForBatchTimeoutClosedContext(t *testing.T) {
+ mdi := &databasemocks.Plugin{}
+ ep, cancel := newTestEventPoller(t, mdi, nil, nil)
+ ep.conf.eventBatchTimeout = 1 * time.Minute
+ cancel()
+ ep.waitForBatchTimeout()
+}
+
func TestDoubleConfirm(t *testing.T) {
mdi := &databasemocks.Plugin{}
ep, cancel := newTestEventPoller(t, mdi, nil, nil)
diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go
index 788e72ec3..928e3789e 100644
--- a/internal/events/subscription_manager.go
+++ b/internal/events/subscription_manager.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -20,6 +20,7 @@ import (
"context"
"regexp"
"sync"
+ "time"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
@@ -90,6 +91,9 @@ type subscriptionManager struct {
newOrUpdatedSubscriptions chan *fftypes.UUID
deletedSubscriptions chan *fftypes.UUID
retry retry.Retry
+
+ defaultBatchSize uint16
+ defaultBatchTimeout time.Duration
}
func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *eventEnricher, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (*subscriptionManager, error) {
@@ -116,6 +120,8 @@ func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *e
MaximumDelay: config.GetDuration(coreconfig.SubscriptionsRetryMaxDelay),
Factor: config.GetFloat64(coreconfig.SubscriptionsRetryFactor),
},
+ defaultBatchSize: uint16(config.GetInt(coreconfig.SubscriptionDefaultsBatchSize)),
+ defaultBatchTimeout: config.GetDuration(coreconfig.SubscriptionDefaultsBatchTimeout),
}
for _, ei := range sm.transports {
@@ -270,6 +276,18 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef
subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName]
}
+ // Defaults that only apply in batch mode
+ if subDef.Options.Batch != nil && *subDef.Options.Batch {
+ if subDef.Options.ReadAhead == nil || *subDef.Options.ReadAhead == 0 {
+ defaultBatchSize := sm.defaultBatchSize
+ subDef.Options.ReadAhead = &defaultBatchSize
+ }
+ if subDef.Options.BatchTimeout == nil || *subDef.Options.BatchTimeout == "" {
+ defaultBatchTimeout := sm.defaultBatchTimeout.String()
+ subDef.Options.BatchTimeout = &defaultBatchTimeout
+ }
+ }
+
if err := transport.ValidateOptions(ctx, &subDef.Options); err != nil {
return nil, err
}
diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go
index d72b8d111..ef509c705 100644
--- a/internal/events/subscription_manager_test.go
+++ b/internal/events/subscription_manager_test.go
@@ -551,6 +551,30 @@ func TestCreateSubscriptionSuccessTLSConfig(t *testing.T) {
assert.NotNil(t, sub.definition.Options.TLSConfig)
}
+func TestCreateSubscriptionSuccessBatch(t *testing.T) {
+ coreconfig.Reset()
+
+ mei := &eventsmocks.Plugin{}
+ sm, cancel := newTestSubManager(t, mei)
+ defer cancel()
+
+ mei.On("GetFFRestyConfig", mock.Anything).Return(&ffresty.Config{})
+ mei.On("ValidateOptions", mock.Anything, mock.Anything).Return(nil)
+ truthy := true
+ sub, err := sm.parseSubscriptionDef(sm.ctx, &core.Subscription{
+ Options: core.SubscriptionOptions{
+ SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+ Batch: &truthy,
+ },
+ },
+ Transport: "ut",
+ })
+ assert.NoError(t, err)
+
+ assert.Equal(t, uint16(50), *sub.definition.Options.ReadAhead)
+ assert.Equal(t, "50ms", *sub.definition.Options.BatchTimeout)
+}
+
func TestCreateSubscriptionWithDeprecatedFilters(t *testing.T) {
mei := &eventsmocks.Plugin{}
sm, cancel := newTestSubManager(t, mei)
diff --git a/internal/events/websockets/config.go b/internal/events/websockets/config.go
index 8ce482a87..c5d8df776 100644
--- a/internal/events/websockets/config.go
+++ b/internal/events/websockets/config.go
@@ -1,4 +1,4 @@
-// Copyright © 2022 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -32,5 +32,4 @@ const (
func (ws *WebSockets) InitConfig(config config.Section) {
config.AddKnownKey(ReadBufferSize, bufferSizeDefault)
config.AddKnownKey(WriteBufferSize, bufferSizeDefault)
-
}
diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go
index e72d1eff4..58de422fa 100644
--- a/internal/events/websockets/websocket_connection.go
+++ b/internal/events/websockets/websocket_connection.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -21,6 +21,8 @@ import (
"encoding/json"
"io"
"net/http"
+ "net/url"
+ "strconv"
"sync"
"time"
@@ -49,6 +51,7 @@ type websocketConnection struct {
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
+ inflightBatches []*core.WSEventBatch
mux sync.Mutex
closed bool
remoteAddr string
@@ -94,21 +97,45 @@ func (wc *websocketConnection) assertNamespace(namespace string) (string, error)
return namespace, nil
}
+func isBoolQuerySet(query url.Values, boolOption string) bool {
+ optionValues, hasOptionValues := query[boolOption]
+ return hasOptionValues && (len(optionValues) == 0 || optionValues[0] != "false")
+}
+
+func (wc *websocketConnection) getReadAhead(query url.Values, isBatch bool) *uint16 {
+ readaheadStr := query.Get("readahead")
+ if readaheadStr != "" {
+ readAheadInt, err := strconv.ParseUint(readaheadStr, 10, 16)
+ if err == nil {
+ readahead := uint16(readAheadInt)
+ return &readahead
+ }
+ }
+ return nil
+}
+
+func (wc *websocketConnection) getBatchTimeout(query url.Values) *string {
+ batchTimeout := query.Get("batchtimeout")
+ if batchTimeout != "" {
+ return &batchTimeout
+ }
+ return nil
+}
+
// processAutoStart gives a helper to specify query parameters to auto-start your subscription
func (wc *websocketConnection) processAutoStart(req *http.Request) {
query := req.URL.Query()
- ephemeral, hasEphemeral := req.URL.Query()["ephemeral"]
- isEphemeral := hasEphemeral && (len(ephemeral) == 0 || ephemeral[0] != "false")
+ isEphemeral := isBoolQuerySet(query, "ephemeral")
_, hasName := query["name"]
- autoAck, hasAutoack := req.URL.Query()["autoack"]
- isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
+ isAutoack := isBoolQuerySet(query, "autoack")
namespace, err := wc.assertNamespace(query.Get("namespace"))
if err != nil {
wc.protocolError(err)
return
}
- if hasEphemeral || hasName {
+ if isEphemeral || hasName {
+ isBatch := isBoolQuerySet(query, "batch")
filter := core.NewSubscriptionFilterFromQuery(query)
err := wc.handleStart(&core.WSStart{
AutoAck: &isAutoack,
@@ -116,6 +143,13 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) {
Namespace: namespace,
Name: query.Get("name"),
Filter: filter,
+ Options: core.SubscriptionOptions{
+ SubscriptionCoreOptions: core.SubscriptionCoreOptions{
+ Batch: &isBatch,
+ BatchTimeout: wc.getBatchTimeout(query),
+ ReadAhead: wc.getReadAhead(query, isBatch),
+ },
+ },
})
if err != nil {
wc.protocolError(err)
@@ -231,6 +265,54 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error {
return nil
}
+func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
+ inflightBatch := &core.WSEventBatch{
+ Type: core.WSEventBatchType,
+ ID: fftypes.NewUUID(),
+ Events: make([]*core.EventDelivery, len(events)),
+ }
+ if sub != nil {
+ inflightBatch.Subscription = sub.SubscriptionRef
+ }
+ for i, e := range events {
+ // For ephemeral there's no sub, so we pick up from first event
+ if inflightBatch.Subscription.Namespace == "" {
+ inflightBatch.Subscription = e.Event.Subscription
+ }
+ inflightBatch.Events[i] = e.Event
+ }
+
+ var autoAck bool
+ wc.mux.Lock()
+ autoAck = wc.autoAck
+ if !autoAck {
+ wc.inflightBatches = append(wc.inflightBatches, inflightBatch)
+ }
+ wc.mux.Unlock()
+
+ err := wc.send(inflightBatch)
+ if err != nil {
+ return err
+ }
+
+ if autoAck {
+ wc.ackBatch(inflightBatch)
+ }
+
+ return nil
+}
+
+func (wc *websocketConnection) ackBatch(batch *core.WSEventBatch) {
+ for _, e := range batch.Events {
+ // We individually drive an ack back on each event, but do so in one pass
+ // (this matches the webhook implementation of batching).
+ wc.ws.ack(wc.connID, &core.EventDeliveryResponse{
+ ID: e.ID,
+ Subscription: batch.Subscription,
+ })
+ }
+}
+
func (wc *websocketConnection) protocolError(err error) {
log.L(wc.ctx).Errorf("Sending protocol error to client: %s", err)
sendErr := wc.send(&core.WSError{
@@ -309,6 +391,22 @@ func (wc *websocketConnection) durableSubMatcher(sr core.SubscriptionRef) bool {
return false
}
+func (wc *websocketConnection) handleBatchAck(ack *core.WSAck) (handled bool) {
+ wc.mux.Lock()
+ defer wc.mux.Unlock()
+ var newInflightBatches []*core.WSEventBatch
+ for _, batch := range wc.inflightBatches {
+ if batch.ID.Equals(ack.ID) { // nil safe check
+ wc.ackBatch(batch)
+ handled = true
+ } else {
+ newInflightBatches = append(newInflightBatches, batch)
+ }
+ }
+ wc.inflightBatches = newInflightBatches
+ return handled
+}
+
func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryResponse, error) {
l := log.L(wc.ctx)
var inflight *core.EventDeliveryResponse
@@ -363,6 +461,10 @@ func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryRes
}
func (wc *websocketConnection) handleAck(ack *core.WSAck) error {
+ if handled := wc.handleBatchAck(ack); handled {
+ return nil
+ }
+
// Perform a locked set of check
inflight, err := wc.checkAck(ack)
if err != nil {
diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go
index a12a325dd..fe6dc9bf0 100644
--- a/internal/events/websockets/websockets.go
+++ b/internal/events/websockets/websockets.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -54,9 +54,11 @@ func (ws *WebSockets) Name() string { return "websockets" }
func (ws *WebSockets) Init(ctx context.Context, config config.Section) error {
*ws = WebSockets{
- ctx: ctx,
- connections: make(map[string]*websocketConnection),
- capabilities: &events.Capabilities{},
+ ctx: ctx,
+ connections: make(map[string]*websocketConnection),
+ capabilities: &events.Capabilities{
+ BatchDelivery: true,
+ },
callbacks: callbacks{
handlers: make(map[string]events.Callbacks),
},
@@ -242,6 +244,11 @@ func (ws *WebSockets) GetStatus() *core.WebSocketStatus {
}
func (ws *WebSockets) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error {
- // We should have rejected creation of the subscription, due to us not supporting this in our capabilities
- return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, ws.Name())
+ ws.connMux.Lock()
+ conn, ok := ws.connections[connID]
+ ws.connMux.Unlock()
+ if !ok {
+ return i18n.NewError(ctx, coremsgs.MsgWSConnectionNotActive, connID)
+ }
+ return conn.dispatchBatch(sub, events)
}
diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go
index e2f837988..cba25ebfa 100644
--- a/internal/events/websockets/websockets_test.go
+++ b/internal/events/websockets/websockets_test.go
@@ -236,6 +236,68 @@ func TestStartReceiveAckEphemeral(t *testing.T) {
cbs.AssertExpectations(t)
}
+func TestAutoAckBatch(t *testing.T) {
+ log.SetLevel("trace")
+
+ cbs := &eventsmocks.Callbacks{}
+ ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "autoack=true")
+ defer cancel()
+ var connID string
+ mes := cbs.On("EphemeralSubscription",
+ mock.MatchedBy(func(s string) bool { connID = s; return true }),
+ "ns1", mock.Anything, mock.MatchedBy(func(o *core.SubscriptionOptions) bool {
+ return *o.Batch
+ })).Return(nil)
+ ack := cbs.On("DeliveryResponse",
+ mock.MatchedBy(func(s string) bool { return s == connID }),
+ mock.Anything).Return(nil)
+
+ waitSubscribed := make(chan struct{})
+ mes.RunFn = func(a mock.Arguments) {
+ close(waitSubscribed)
+ }
+
+ waitAcked := make(chan struct{})
+ ack.RunFn = func(a mock.Arguments) {
+ close(waitAcked)
+ }
+
+ err := wsc.Send(context.Background(), []byte(`{
+ "type":"start",
+ "namespace":"ns1",
+ "ephemeral":true,
+ "autoack": true,
+ "options": {
+ "batch": true
+ }
+ }`))
+ assert.NoError(t, err)
+
+ <-waitSubscribed
+ sub := &core.Subscription{
+ SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"},
+ }
+ ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{
+ {Event: &core.EventDelivery{
+ EnrichedEvent: core.EnrichedEvent{
+ Event: core.Event{ID: fftypes.NewUUID()},
+ },
+ Subscription: core.SubscriptionRef{
+ ID: fftypes.NewUUID(),
+ Namespace: "ns1",
+ },
+ }},
+ })
+
+ b := <-wsc.Receive()
+ var res core.EventDelivery
+ err = json.Unmarshal(b, &res)
+ assert.NoError(t, err)
+
+ <-waitAcked
+ cbs.AssertExpectations(t)
+}
+
func TestStartReceiveDurable(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
ws, wsc, cancel := newTestWebsockets(t, cbs, nil)
@@ -243,12 +305,13 @@ func TestStartReceiveDurable(t *testing.T) {
var connID string
sub := cbs.On("RegisterConnection",
mock.MatchedBy(func(s string) bool { connID = s; return true }),
- mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
- return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
- }),
- ).Return(nil)
+ mock.Anything,
+ ).Return(nil).Run(func(args mock.Arguments) {
+ subMatch := args[1].(events.SubscriptionMatcher)
+ assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+ })
ack := cbs.On("DeliveryResponse",
mock.MatchedBy(func(s string) bool { return s == connID }),
mock.Anything).Return(nil)
@@ -316,28 +379,107 @@ func TestStartReceiveDurable(t *testing.T) {
cbs.AssertExpectations(t)
}
-func TestStartReceiveDurableWithAuth(t *testing.T) {
+func TestStartReceiveDurableBatch(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
- ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
+ ws, wsc, cancel := newTestWebsockets(t, cbs, nil)
defer cancel()
var connID string
- sub := cbs.On("RegisterConnection",
+ mrg := cbs.On("RegisterConnection",
mock.MatchedBy(func(s string) bool { connID = s; return true }),
- mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
- return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
- }),
- ).Return(nil)
+ mock.Anything,
+ ).Return(nil).Run(func(args mock.Arguments) {
+ subMatch := args[1].(events.SubscriptionMatcher)
+ assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+ })
ack := cbs.On("DeliveryResponse",
mock.MatchedBy(func(s string) bool { return s == connID }),
mock.Anything).Return(nil)
waitSubscribed := make(chan struct{})
- sub.RunFn = func(a mock.Arguments) {
+ mrg.RunFn = func(a mock.Arguments) {
close(waitSubscribed)
}
+ acks := make(chan *core.EventDeliveryResponse)
+ ack.RunFn = func(a mock.Arguments) {
+ acks <- a[1].(*core.EventDeliveryResponse)
+ }
+
+ err := wsc.Send(context.Background(), []byte(`{"type":"start","namespace":"ns1","name":"sub1"}`))
+ assert.NoError(t, err)
+
+ <-waitSubscribed
+ sub := &core.Subscription{
+ SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"},
+ }
+ event1ID := fftypes.NewUUID()
+ event2ID := fftypes.NewUUID()
+ ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{
+ {
+ Event: &core.EventDelivery{
+ EnrichedEvent: core.EnrichedEvent{
+ Event: core.Event{ID: event1ID},
+ },
+ Subscription: sub.SubscriptionRef,
+ },
+ },
+ {
+ Event: &core.EventDelivery{
+ EnrichedEvent: core.EnrichedEvent{
+ Event: core.Event{ID: event2ID},
+ },
+ Subscription: sub.SubscriptionRef,
+ },
+ },
+ })
+
+ b := <-wsc.Receive()
+ var deliveredBatch core.WSEventBatch
+ err = json.Unmarshal(b, &deliveredBatch)
+ assert.NoError(t, err)
+ assert.Len(t, deliveredBatch.Events, 2)
+ assert.Equal(t, "ns1", deliveredBatch.Subscription.Namespace)
+ assert.Equal(t, "sub1", deliveredBatch.Subscription.Name)
+ err = wsc.Send(context.Background(), []byte(fmt.Sprintf(`{
+ "type":"ack",
+ "id": "%s",
+ "subscription": {
+ "namespace": "ns1",
+ "name": "sub1"
+ }
+ }`, deliveredBatch.ID)))
+ assert.NoError(t, err)
+
+ ack1 := <-acks
+ assert.Equal(t, *event1ID, *ack1.ID)
+ ack2 := <-acks
+ assert.Equal(t, *event2ID, *ack2.ID)
+
+ cbs.AssertExpectations(t)
+}
+
+func TestStartReceiveDurableWithAuth(t *testing.T) {
+ cbs := &eventsmocks.Callbacks{}
+ ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
+ defer cancel()
+ var connID string
+ waitSubscribed := make(chan struct{})
+ cbs.On("RegisterConnection",
+ mock.MatchedBy(func(s string) bool { connID = s; return true }),
+ mock.Anything,
+ ).Run(func(args mock.Arguments) {
+ subMatch := args[1].(events.SubscriptionMatcher)
+ assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+ close(waitSubscribed)
+ }).Return(nil)
+ ack := cbs.On("DeliveryResponse",
+ mock.MatchedBy(func(s string) bool { return s == connID }),
+ mock.Anything).Return(nil)
+
waitAcked := make(chan struct{})
ack.RunFn = func(a mock.Arguments) {
close(waitAcked)
@@ -401,22 +543,20 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) {
_, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{})
defer cancel()
var connID string
- sub := cbs.On("RegisterConnection",
+ waitSubscribed := make(chan struct{})
+ cbs.On("RegisterConnection",
mock.MatchedBy(func(s string) bool { connID = s; return true }),
- mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
- return subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
- }),
- ).Return(nil)
+ mock.Anything,
+ ).Return(nil).Run(func(args mock.Arguments) {
+ subMatch := args[1].(events.SubscriptionMatcher)
+ assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+ close(waitSubscribed)
+ })
ack := cbs.On("DeliveryResponse",
mock.MatchedBy(func(s string) bool { return s == connID }),
mock.Anything).Return(nil)
- waitSubscribed := make(chan struct{})
- sub.RunFn = func(a mock.Arguments) {
- close(waitSubscribed)
- }
-
waitAcked := make(chan struct{})
ack.RunFn = func(a mock.Arguments) {
close(waitAcked)
@@ -435,18 +575,18 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) {
func TestAutoStartReceiveAckEphemeral(t *testing.T) {
var connID string
cbs := &eventsmocks.Callbacks{}
- sub := cbs.On("EphemeralSubscription",
+ waitSubscribed := make(chan struct{})
+ cbs.On("EphemeralSubscription",
mock.MatchedBy(func(s string) bool { connID = s; return true }),
- "ns1", mock.Anything, mock.Anything).Return(nil)
+ "ns1", mock.Anything, mock.Anything).
+ Return(nil).
+ Run(func(args mock.Arguments) {
+ close(waitSubscribed)
+ })
ack := cbs.On("DeliveryResponse",
mock.MatchedBy(func(s string) bool { return s == connID }),
mock.Anything).Return(nil)
- waitSubscribed := make(chan struct{})
- sub.RunFn = func(a mock.Arguments) {
- close(waitSubscribed)
- }
-
waitAcked := make(chan struct{})
ack.RunFn = func(a mock.Arguments) {
close(waitAcked)
@@ -478,6 +618,55 @@ func TestAutoStartReceiveAckEphemeral(t *testing.T) {
cbs.AssertExpectations(t)
}
+func TestAutoStartReceiveAckBatchEphemeral(t *testing.T) {
+ var connID string
+ cbs := &eventsmocks.Callbacks{}
+ waitSubscribed := make(chan struct{})
+ cbs.On("EphemeralSubscription",
+ mock.MatchedBy(func(s string) bool { connID = s; return true }),
+ "ns1", mock.Anything, mock.Anything).
+ Return(nil).
+ Run(func(args mock.Arguments) {
+ close(waitSubscribed)
+ })
+ ack := cbs.On("DeliveryResponse",
+ mock.MatchedBy(func(s string) bool { return s == connID }),
+ mock.Anything).Return(nil)
+
+ waitAcked := make(chan struct{})
+ ack.RunFn = func(a mock.Arguments) {
+ close(waitAcked)
+ }
+
+ ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns1", "batch")
+ defer cancel()
+
+ <-waitSubscribed
+ ws.BatchDeliveryRequest(ws.ctx, connID, nil, []*core.CombinedEventDataDelivery{
+ {Event: &core.EventDelivery{
+ EnrichedEvent: core.EnrichedEvent{
+ Event: core.Event{ID: fftypes.NewUUID()},
+ },
+ Subscription: core.SubscriptionRef{
+ ID: fftypes.NewUUID(),
+ Namespace: "ns1",
+ },
+ }},
+ })
+
+ b := <-wsc.Receive()
+ var deliveredBatch core.WSEventBatch
+ err := json.Unmarshal(b, &deliveredBatch)
+ assert.NoError(t, err)
+ assert.Len(t, deliveredBatch.Events, 1)
+
+ err = wsc.Send(context.Background(), []byte(`{"type":"ack", "id": "`+deliveredBatch.ID.String()+`"}`))
+ assert.NoError(t, err)
+
+ <-waitAcked
+ cbs.AssertExpectations(t)
+}
+
func TestAutoStartBadOptions(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
_, wsc, cancel := newTestWebsockets(t, cbs, nil, "name=missingnamespace")
@@ -491,6 +680,29 @@ func TestAutoStartBadOptions(t *testing.T) {
cbs.AssertExpectations(t)
}
+func TestAutoStartCustomReadAheadBatch(t *testing.T) {
+ cbs := &eventsmocks.Callbacks{}
+
+ subscribedConn := make(chan string, 1)
+ cbs.On("EphemeralSubscription",
+ mock.MatchedBy(func(s string) bool {
+ subscribedConn <- s
+ return true
+ }),
+ "ns1",
+ mock.Anything,
+ mock.MatchedBy(func(o *core.SubscriptionOptions) bool {
+ return *o.ReadAhead == 42 && *o.BatchTimeout == "1s"
+ }),
+ ).Return(nil)
+
+ _, _, cancel := newTestWebsockets(t, cbs, nil, "namespace=ns1", "ephemeral", "batch", "batchtimeout=1s", "readahead=42")
+ defer cancel()
+
+ <-subscribedConn
+
+}
+
func TestAutoStartBadNamespace(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
_, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns2")
@@ -523,6 +735,30 @@ func TestHandleAckWithAutoAck(t *testing.T) {
assert.Regexp(t, "FF10180", err)
}
+func TestHandleBatchNotMatch(t *testing.T) {
+ eventUUID := fftypes.NewUUID()
+ wsc := &websocketConnection{
+ ctx: context.Background(),
+ started: []*websocketStartedSub{{WSStart: core.WSStart{
+ Ephemeral: false, Name: "name1", Namespace: "ns1",
+ }}},
+ sendMessages: make(chan interface{}, 1),
+ inflight: []*core.EventDeliveryResponse{
+ {ID: eventUUID},
+ },
+ inflightBatches: []*core.WSEventBatch{
+ {ID: fftypes.NewUUID()},
+ },
+ autoAck: true,
+ }
+ err := wsc.handleAck(&core.WSAck{
+ ID: eventUUID,
+ })
+ assert.Regexp(t, "FF10180", err)
+ assert.Len(t, wsc.inflight, 1)
+ assert.Len(t, wsc.inflightBatches, 1)
+}
+
func TestHandleStartFlippingAutoAck(t *testing.T) {
eventUUID := fftypes.NewUUID()
wsc := &websocketConnection{
@@ -665,6 +901,16 @@ func TestConnectionDispatchAfterClose(t *testing.T) {
assert.Regexp(t, "FF00147", err)
}
+func TestConnectionDispatchBatchAfterClose(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ wsc := &websocketConnection{
+ ctx: ctx,
+ }
+ err := wsc.dispatchBatch(&core.Subscription{}, []*core.CombinedEventDataDelivery{})
+ assert.Regexp(t, "FF00147", err)
+}
+
func TestWebsocketDispatchAfterClose(t *testing.T) {
ws := &WebSockets{
ctx: context.Background(),
@@ -674,6 +920,15 @@ func TestWebsocketDispatchAfterClose(t *testing.T) {
assert.Regexp(t, "FF10173", err)
}
+func TestWebsocketBatchDispatchAfterClose(t *testing.T) {
+ ws := &WebSockets{
+ ctx: context.Background(),
+ connections: make(map[string]*websocketConnection),
+ }
+ err := ws.BatchDeliveryRequest(ws.ctx, "gone", nil, []*core.CombinedEventDataDelivery{})
+ assert.Regexp(t, "FF10173", err)
+}
+
func TestDispatchAutoAck(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
cbs.On("DeliveryResponse", mock.Anything, mock.Anything).Return(nil)
@@ -826,21 +1081,6 @@ func TestNamespaceRestartedFailClose(t *testing.T) {
mcb.AssertExpectations(t)
}
-func TestEventDeliveryBatchReturnsUnsupported(t *testing.T) {
- cbs := &eventsmocks.Callbacks{}
- ws, _, cancel := newTestWebsockets(t, cbs, nil)
- defer cancel()
-
- sub := &core.Subscription{
- SubscriptionRef: core.SubscriptionRef{
- Namespace: "ns1",
- },
- }
-
- err := ws.BatchDeliveryRequest(ws.ctx, "id", sub, []*core.CombinedEventDataDelivery{})
- assert.Regexp(t, "FF10461", err)
-}
-
func TestNamespaceScopedSendWrongNamespaceStartAction(t *testing.T) {
cbs := &eventsmocks.Callbacks{}
_, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1")
@@ -889,23 +1129,22 @@ func TestNamespaceScopedSuccess(t *testing.T) {
ws, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1")
defer cancel()
var connID string
- sub := cbs.On("RegisterConnection",
+ waitSubscribed := make(chan struct{})
+
+ cbs.On("RegisterConnection",
mock.MatchedBy(func(s string) bool { connID = s; return true }),
- mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool {
- return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) &&
- !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})
- }),
- ).Return(nil)
+ mock.Anything,
+ ).Return(nil).Run(func(args mock.Arguments) {
+ subMatch := args[1].(events.SubscriptionMatcher)
+ assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}))
+ assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}))
+ close(waitSubscribed)
+ })
ack := cbs.On("DeliveryResponse",
mock.MatchedBy(func(s string) bool { return s == connID }),
mock.Anything).Return(nil)
- waitSubscribed := make(chan struct{})
- sub.RunFn = func(a mock.Arguments) {
- close(waitSubscribed)
- }
-
waitAcked := make(chan struct{})
ack.RunFn = func(a mock.Arguments) {
close(waitAcked)
diff --git a/pkg/core/subscription.go b/pkg/core/subscription.go
index 7af6fc85e..e9e789d1d 100644
--- a/pkg/core/subscription.go
+++ b/pkg/core/subscription.go
@@ -1,4 +1,4 @@
-// Copyright © 2023 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -88,6 +88,7 @@ const (
)
// SubscriptionCoreOptions are the core options that apply across all transports
+// REMEMBER TO ADD OPTIONS HERE TO MarshalJSON()
type SubscriptionCoreOptions struct {
FirstEvent *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"`
ReadAhead *uint16 `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"`
@@ -167,6 +168,12 @@ func (so SubscriptionOptions) MarshalJSON() ([]byte, error) {
if so.TLSConfigName != "" {
so.additionalOptions["tlsConfigName"] = so.TLSConfigName
}
+ if so.Batch != nil {
+ so.additionalOptions["batch"] = so.Batch
+ }
+ if so.BatchTimeout != nil {
+ so.additionalOptions["batchTimeout"] = so.BatchTimeout
+ }
return json.Marshal(&so.additionalOptions)
}
diff --git a/pkg/core/subscription_test.go b/pkg/core/subscription_test.go
index 0165dc593..45d1ebf00 100644
--- a/pkg/core/subscription_test.go
+++ b/pkg/core/subscription_test.go
@@ -28,12 +28,15 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
firstEvent := SubOptsFirstEventNewest
readAhead := uint16(50)
yes := true
+ oneSec := "1s"
sub1 := &Subscription{
Options: SubscriptionOptions{
SubscriptionCoreOptions: SubscriptionCoreOptions{
- FirstEvent: &firstEvent,
- ReadAhead: &readAhead,
- WithData: &yes,
+ FirstEvent: &firstEvent,
+ ReadAhead: &readAhead,
+ WithData: &yes,
+ Batch: &yes,
+ BatchTimeout: &oneSec,
},
WebhookSubOptions: WebhookSubOptions{
TLSConfigName: "myconfig",
@@ -49,7 +52,18 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) {
// Verify it serializes as bytes to the database
b1, err := sub1.Options.Value()
assert.NoError(t, err)
- assert.Equal(t, `{"firstEvent":"newest","my-nested-opts":{"myopt1":12345,"myopt2":"test"},"readAhead":50,"tlsConfigName":"myconfig","withData":true}`, string(b1.([]byte)))
+ assert.JSONEq(t, `{
+ "firstEvent":"newest",
+ "my-nested-opts":{
+ "myopt1":12345,
+ "myopt2":"test"
+ },
+ "readAhead":50,
+ "tlsConfigName":"myconfig",
+ "withData":true,
+ "batch":true,
+ "batchTimeout":"1s"
+ }`, string(b1.([]byte)))
f1, err := sub1.Filter.Value()
assert.NoError(t, err)
diff --git a/pkg/core/websocket_actions.go b/pkg/core/websocket_actions.go
index d02fb9940..551148946 100644
--- a/pkg/core/websocket_actions.go
+++ b/pkg/core/websocket_actions.go
@@ -1,4 +1,4 @@
-// Copyright © 2022 Kaleido, Inc.
+// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
@@ -29,6 +29,9 @@ var (
// WSProtocolErrorEventType is a special event "type" field for server to send the client, if it performs a ProtocolError
WSProtocolErrorEventType = fftypes.FFEnumValue("wstype", "protocol_error")
+
+ // WSEventBatchType is the type set when the message contains an array of events
+ WSEventBatchType = fftypes.FFEnumValue("wstype", "event_batch")
)
// WSActionBase is the base fields of all client actions sent on the websocket
@@ -61,3 +64,12 @@ type WSError struct {
Type WSClientPayloadType `ffstruct:"WSAck" json:"type" ffenum:"wstype"`
Error string `ffstruct:"WSAck" json:"error"`
}
+
+// WSEventBatch is used when batched delivery is enabled over the websocket, allowing
+// an array of events to be ack'd as a whole (rather than ack'ing individually)
+type WSEventBatch struct {
+ Type WSClientPayloadType `ffstruct:"WSEventBatch" json:"type" ffenum:"wstype"`
+ ID *fftypes.UUID `ffstruct:"WSEventBatch" json:"id"`
+ Subscription SubscriptionRef `ffstruct:"WSEventBatch" json:"subscription"`
+ Events []*EventDelivery `ffstruct:"WSEventBatch" json:"events"`
+}